Skip to content

Commit

Permalink
Remove OTel span propagation hack.
Browse files Browse the repository at this point in the history
Signed-off-by: Ales Justin <[email protected]>
  • Loading branch information
alesj committed Jul 11, 2022
1 parent c7c1ec6 commit e20ddb5
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 84 deletions.
6 changes: 5 additions & 1 deletion src/main/java/io/strimzi/kafka/bridge/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.strimzi.kafka.bridge.amqp.AmqpBridge;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.config.BridgeExecutorServiceFactory;
import io.strimzi.kafka.bridge.http.HttpBridge;
import io.strimzi.kafka.bridge.tracing.TracingUtil;
import io.vertx.config.ConfigRetriever;
Expand All @@ -18,6 +19,7 @@
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.impl.VertxBuilder;
import io.vertx.core.json.JsonObject;
import io.vertx.micrometer.Label;
import io.vertx.micrometer.MetricsDomain;
Expand Down Expand Up @@ -72,7 +74,9 @@ public static void main(String[] args) {
vertxOptions.setMetricsOptions(metricsOptions());
jmxCollectorRegistry = getJmxCollectorRegistry();
}
Vertx vertx = Vertx.vertx(vertxOptions);
VertxBuilder vertxBuilder = new VertxBuilder(vertxOptions)
.executorServiceFactory(new BridgeExecutorServiceFactory());
Vertx vertx = vertxBuilder.init().vertx();
// MeterRegistry default instance is just null if metrics are not enabled in the VertxOptions instance
MeterRegistry meterRegistry = BackendRegistries.getDefaultNow();
MetricsReporter metricsReporter = new MetricsReporter(jmxCollectorRegistry, meterRegistry);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/

package io.strimzi.kafka.bridge.config;

import io.strimzi.kafka.bridge.tracing.TracingUtil;
import io.vertx.core.spi.ExecutorServiceFactory;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Custom Vertx ExecutorServiceFactory - delegate to tracing impl to provide one.
* Initially it could be NoopTracingHandle that provides it - before Application is fully initialized,
* then it should be actual tracing implementation - if there is one.
* Shutdown should be done OK, since tracing delegate will also delegate shutdown to original,
* or original itself will be used.
*/
public class BridgeExecutorServiceFactory implements ExecutorServiceFactory {
@Override
public ExecutorService createExecutor(ThreadFactory threadFactory, Integer concurrency, Integer maxConcurrency) {
ExecutorService original = ExecutorServiceFactory.INSTANCE.createExecutor(threadFactory, concurrency, maxConcurrency);
return new ExecutorService() {
private ExecutorService delegate() {
ExecutorService service = TracingUtil.getTracing().get(original);
return service == null ? original : service;
}

@Override
public void shutdown() {
delegate().shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return delegate().shutdownNow();
}

@Override
public boolean isShutdown() {
return delegate().isShutdown();
}

@Override
public boolean isTerminated() {
return delegate().isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate().awaitTermination(timeout, unit);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate().submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate().submit(task, result);
}

@Override
public Future<?> submit(Runnable task) {
return delegate().submit(task);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate().invokeAll(tasks);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return delegate().invokeAll(tasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate().invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate().invokeAny(tasks, timeout, unit);
}

@Override
public void execute(Runnable command) {
delegate().execute(command);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,7 @@ public void handle(Endpoint<?> endpoint) {
if (isAsync) {
// if async is specified, return immediately once records are sent
for (KafkaProducerRecord<K, V> record : records) {
span.prepare(record);
Promise<RecordMetadata> promise = Promise.promise();
promise.future().onComplete(ar -> span.clean(record));
this.send(record, promise);
this.send(record, null);
}
span.finish(HttpResponseStatus.NO_CONTENT.code());
HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(),
Expand All @@ -136,9 +133,8 @@ public void handle(Endpoint<?> endpoint) {
List<Future> sendHandlers = new ArrayList<>(records.size());
for (KafkaProducerRecord<K, V> record : records) {
Promise<RecordMetadata> promise = Promise.promise();
Future<RecordMetadata> future = promise.future().onComplete(ar -> span.clean(record));
Future<RecordMetadata> future = promise.future();
sendHandlers.add(future);
span.prepare(record);
this.send(record, promise);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,14 @@
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.RoutingContext;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;

import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;

import static io.strimzi.kafka.bridge.tracing.TracingConstants.COMPONENT;
import static io.strimzi.kafka.bridge.tracing.TracingConstants.JAEGER;
Expand All @@ -52,6 +46,7 @@
class OpenTelemetryHandle implements TracingHandle {

private Tracer tracer;
private ExecutorService service;

static void setCommonAttributes(SpanBuilder builder, RoutingContext routingContext) {
builder.setAttribute(SemanticAttributes.PEER_SERVICE, KAFKA_SERVICE);
Expand Down Expand Up @@ -87,6 +82,14 @@ public void initialize() {
AutoConfiguredOpenTelemetrySdk.initialize();
}

@Override
public synchronized ExecutorService get(ExecutorService provided) {
if (service == null) {
service = Context.taskWrapping(provided);
}
return service;
}

private Tracer get() {
if (tracer == null) {
tracer = GlobalOpenTelemetry.getTracer(COMPONENT);
Expand Down Expand Up @@ -188,7 +191,7 @@ public SpanHandle<K, V> span(RoutingContext routingContext) {

@Override
public void addTracingPropsToProducerConfig(Properties props) {
TracingUtil.addProperty(props, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ContextAwareTracingProducerInterceptor.class.getName());
TracingUtil.addProperty(props, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
}

private static final class OTelSpanHandle<K, V> implements SpanHandle<K, V> {
Expand All @@ -200,32 +203,6 @@ public OTelSpanHandle(Span span) {
this.scope = span.makeCurrent();
}

/**
* See ContextAwareTracingProducerInterceptor for more info.
*
* @param record Kafka producer record to use as payload
*/
@Override
public void prepare(KafkaProducerRecord<K, V> record) {
String uuid = UUID.randomUUID().toString();
SPANS.put(uuid, span);
record.addHeader(X_UUID, uuid);
}

/**
* See ContextAwareTracingProducerInterceptor for more info.
*
* @param record Kafka producer record to use as payload
*/
@Override
public void clean(KafkaProducerRecord<K, V> record) {
Optional<KafkaHeader> oh = record.headers().stream().filter(h -> h.key().equals(X_UUID)).findFirst();
oh.ifPresent(h -> {
String uuid = h.value().toString();
SPANS.remove(uuid);
});
}

@Override
public void inject(KafkaProducerRecord<K, V> record) {
propagator().inject(Context.current(), record, KafkaProducerRecord::addHeader);
Expand All @@ -247,31 +224,4 @@ public void finish(int code) {
}
}
}

static final String X_UUID = "_UUID";
static final Map<String, Span> SPANS = new ConcurrentHashMap<>();

/**
* This interceptor is a workaround for async message send.
* OpenTelemetry propagates current span via ThreadLocal,
* where we have an async send - different thread.
* So we need to pass-in the current span info via SPANS map.
* ProducerRecord is a bit abused as a payload / info carrier,
* it holds an unique UUID, which maps to current span in SPANS map.
*
* @param <K> key type
* @param <V> value type
*/
public static class ContextAwareTracingProducerInterceptor<K, V> extends TracingProducerInterceptor<K, V> {
@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
Headers headers = record.headers();
String key = Buffer.buffer(headers.lastHeader(X_UUID).value()).toString();
headers.remove(X_UUID);
Span span = SPANS.remove(key);
try (Scope ignored = span.makeCurrent()) {
return super.onSend(record);
}
}
}
}
16 changes: 0 additions & 16 deletions src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,6 @@
* Span handle, an abstraction over actual span implementation.
*/
public interface SpanHandle<K, V> {
/**
* Prepare Kafka producer record before async send.
*
* @param record Kafka producer record to use as payload
*/
default void prepare(KafkaProducerRecord<K, V> record) {
}

/**
* Clean Kafka producer record after async send.
*
* @param record Kafka producer record used as payload
*/
default void clean(KafkaProducerRecord<K, V> record) {
}

/**
* Inject tracing info into underlying span from Kafka producer record.
*
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutorService;

/**
* Simple interface to abstract tracing between legacy OpenTracing and new OpenTelemetry.
Expand All @@ -35,6 +36,17 @@ public interface TracingHandle {
*/
void initialize();

/**
* Get custom executor service if needed.
* Else return null.
*
* @param service current executor service
* @return custom executor service or null
*/
default ExecutorService get(ExecutorService service) {
return null;
}

/**
* Build span builder handle.
*
Expand Down

0 comments on commit e20ddb5

Please sign in to comment.