From e20ddb5d7d7ab45b58b18575086ad2ebdc510d8b Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Mon, 11 Jul 2022 13:24:12 +0200 Subject: [PATCH] Remove OTel span propagation hack. Signed-off-by: Ales Justin --- .../io/strimzi/kafka/bridge/Application.java | 6 +- .../config/BridgeExecutorServiceFactory.java | 104 ++++++++++++++++++ .../bridge/http/HttpSourceBridgeEndpoint.java | 8 +- .../bridge/tracing/OpenTelemetryHandle.java | 72 ++---------- .../kafka/bridge/tracing/SpanHandle.java | 16 --- .../kafka/bridge/tracing/TracingHandle.java | 12 ++ 6 files changed, 134 insertions(+), 84 deletions(-) create mode 100644 src/main/java/io/strimzi/kafka/bridge/config/BridgeExecutorServiceFactory.java diff --git a/src/main/java/io/strimzi/kafka/bridge/Application.java b/src/main/java/io/strimzi/kafka/bridge/Application.java index 2249af991..9bcd62ef0 100644 --- a/src/main/java/io/strimzi/kafka/bridge/Application.java +++ b/src/main/java/io/strimzi/kafka/bridge/Application.java @@ -8,6 +8,7 @@ import io.micrometer.core.instrument.MeterRegistry; import io.strimzi.kafka.bridge.amqp.AmqpBridge; import io.strimzi.kafka.bridge.config.BridgeConfig; +import io.strimzi.kafka.bridge.config.BridgeExecutorServiceFactory; import io.strimzi.kafka.bridge.http.HttpBridge; import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.vertx.config.ConfigRetriever; @@ -18,6 +19,7 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; +import io.vertx.core.impl.VertxBuilder; import io.vertx.core.json.JsonObject; import io.vertx.micrometer.Label; import io.vertx.micrometer.MetricsDomain; @@ -72,7 +74,9 @@ public static void main(String[] args) { vertxOptions.setMetricsOptions(metricsOptions()); jmxCollectorRegistry = getJmxCollectorRegistry(); } - Vertx vertx = Vertx.vertx(vertxOptions); + VertxBuilder vertxBuilder = new VertxBuilder(vertxOptions) + .executorServiceFactory(new BridgeExecutorServiceFactory()); + Vertx vertx = vertxBuilder.init().vertx(); // MeterRegistry default instance is just null if metrics are not enabled in the VertxOptions instance MeterRegistry meterRegistry = BackendRegistries.getDefaultNow(); MetricsReporter metricsReporter = new MetricsReporter(jmxCollectorRegistry, meterRegistry); diff --git a/src/main/java/io/strimzi/kafka/bridge/config/BridgeExecutorServiceFactory.java b/src/main/java/io/strimzi/kafka/bridge/config/BridgeExecutorServiceFactory.java new file mode 100644 index 000000000..85a33d2a8 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/config/BridgeExecutorServiceFactory.java @@ -0,0 +1,104 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.config; + +import io.strimzi.kafka.bridge.tracing.TracingUtil; +import io.vertx.core.spi.ExecutorServiceFactory; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Custom Vertx ExecutorServiceFactory - delegate to tracing impl to provide one. + * Initially it could be NoopTracingHandle that provides it - before Application is fully initialized, + * then it should be actual tracing implementation - if there is one. + * Shutdown should be done OK, since tracing delegate will also delegate shutdown to original, + * or original itself will be used. + */ +public class BridgeExecutorServiceFactory implements ExecutorServiceFactory { + @Override + public ExecutorService createExecutor(ThreadFactory threadFactory, Integer concurrency, Integer maxConcurrency) { + ExecutorService original = ExecutorServiceFactory.INSTANCE.createExecutor(threadFactory, concurrency, maxConcurrency); + return new ExecutorService() { + private ExecutorService delegate() { + ExecutorService service = TracingUtil.getTracing().get(original); + return service == null ? original : service; + } + + @Override + public void shutdown() { + delegate().shutdown(); + } + + @Override + public List shutdownNow() { + return delegate().shutdownNow(); + } + + @Override + public boolean isShutdown() { + return delegate().isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate().isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate().awaitTermination(timeout, unit); + } + + @Override + public Future submit(Callable task) { + return delegate().submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return delegate().submit(task, result); + } + + @Override + public Future submit(Runnable task) { + return delegate().submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + return delegate().invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { + return delegate().invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return delegate().invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return delegate().invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) { + delegate().execute(command); + } + }; + } +} diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java index c5bf30240..c4db356ce 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java @@ -121,10 +121,7 @@ public void handle(Endpoint endpoint) { if (isAsync) { // if async is specified, return immediately once records are sent for (KafkaProducerRecord record : records) { - span.prepare(record); - Promise promise = Promise.promise(); - promise.future().onComplete(ar -> span.clean(record)); - this.send(record, promise); + this.send(record, null); } span.finish(HttpResponseStatus.NO_CONTENT.code()); HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), @@ -136,9 +133,8 @@ public void handle(Endpoint endpoint) { List sendHandlers = new ArrayList<>(records.size()); for (KafkaProducerRecord record : records) { Promise promise = Promise.promise(); - Future future = promise.future().onComplete(ar -> span.clean(record)); + Future future = promise.future(); sendHandlers.add(future); - span.prepare(record); this.send(record, promise); } diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java index d56b5b8cd..0efcf2b86 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java @@ -23,20 +23,14 @@ import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import io.strimzi.kafka.bridge.config.BridgeConfig; -import io.vertx.core.buffer.Buffer; import io.vertx.ext.web.RoutingContext; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; -import io.vertx.kafka.client.producer.KafkaHeader; import io.vertx.kafka.client.producer.KafkaProducerRecord; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.header.Headers; import java.util.Map; -import java.util.Optional; import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import static io.strimzi.kafka.bridge.tracing.TracingConstants.COMPONENT; import static io.strimzi.kafka.bridge.tracing.TracingConstants.JAEGER; @@ -52,6 +46,7 @@ class OpenTelemetryHandle implements TracingHandle { private Tracer tracer; + private ExecutorService service; static void setCommonAttributes(SpanBuilder builder, RoutingContext routingContext) { builder.setAttribute(SemanticAttributes.PEER_SERVICE, KAFKA_SERVICE); @@ -87,6 +82,14 @@ public void initialize() { AutoConfiguredOpenTelemetrySdk.initialize(); } + @Override + public synchronized ExecutorService get(ExecutorService provided) { + if (service == null) { + service = Context.taskWrapping(provided); + } + return service; + } + private Tracer get() { if (tracer == null) { tracer = GlobalOpenTelemetry.getTracer(COMPONENT); @@ -188,7 +191,7 @@ public SpanHandle span(RoutingContext routingContext) { @Override public void addTracingPropsToProducerConfig(Properties props) { - TracingUtil.addProperty(props, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ContextAwareTracingProducerInterceptor.class.getName()); + TracingUtil.addProperty(props, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); } private static final class OTelSpanHandle implements SpanHandle { @@ -200,32 +203,6 @@ public OTelSpanHandle(Span span) { this.scope = span.makeCurrent(); } - /** - * See ContextAwareTracingProducerInterceptor for more info. - * - * @param record Kafka producer record to use as payload - */ - @Override - public void prepare(KafkaProducerRecord record) { - String uuid = UUID.randomUUID().toString(); - SPANS.put(uuid, span); - record.addHeader(X_UUID, uuid); - } - - /** - * See ContextAwareTracingProducerInterceptor for more info. - * - * @param record Kafka producer record to use as payload - */ - @Override - public void clean(KafkaProducerRecord record) { - Optional oh = record.headers().stream().filter(h -> h.key().equals(X_UUID)).findFirst(); - oh.ifPresent(h -> { - String uuid = h.value().toString(); - SPANS.remove(uuid); - }); - } - @Override public void inject(KafkaProducerRecord record) { propagator().inject(Context.current(), record, KafkaProducerRecord::addHeader); @@ -247,31 +224,4 @@ public void finish(int code) { } } } - - static final String X_UUID = "_UUID"; - static final Map SPANS = new ConcurrentHashMap<>(); - - /** - * This interceptor is a workaround for async message send. - * OpenTelemetry propagates current span via ThreadLocal, - * where we have an async send - different thread. - * So we need to pass-in the current span info via SPANS map. - * ProducerRecord is a bit abused as a payload / info carrier, - * it holds an unique UUID, which maps to current span in SPANS map. - * - * @param key type - * @param value type - */ - public static class ContextAwareTracingProducerInterceptor extends TracingProducerInterceptor { - @Override - public ProducerRecord onSend(ProducerRecord record) { - Headers headers = record.headers(); - String key = Buffer.buffer(headers.lastHeader(X_UUID).value()).toString(); - headers.remove(X_UUID); - Span span = SPANS.remove(key); - try (Scope ignored = span.makeCurrent()) { - return super.onSend(record); - } - } - } } diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java index c8ad09de4..c0cfdf092 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java @@ -12,22 +12,6 @@ * Span handle, an abstraction over actual span implementation. */ public interface SpanHandle { - /** - * Prepare Kafka producer record before async send. - * - * @param record Kafka producer record to use as payload - */ - default void prepare(KafkaProducerRecord record) { - } - - /** - * Clean Kafka producer record after async send. - * - * @param record Kafka producer record used as payload - */ - default void clean(KafkaProducerRecord record) { - } - /** * Inject tracing info into underlying span from Kafka producer record. * diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java index 3bcc189ec..d3b6bc329 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java @@ -10,6 +10,7 @@ import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import java.util.Properties; +import java.util.concurrent.ExecutorService; /** * Simple interface to abstract tracing between legacy OpenTracing and new OpenTelemetry. @@ -35,6 +36,17 @@ public interface TracingHandle { */ void initialize(); + /** + * Get custom executor service if needed. + * Else return null. + * + * @param service current executor service + * @return custom executor service or null + */ + default ExecutorService get(ExecutorService service) { + return null; + } + /** * Build span builder handle. *