From f4a4824ec6d2c19c06b42dce6c479b4df8398f76 Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Wed, 13 Jul 2022 11:46:49 +0200 Subject: [PATCH] Remove OTel executor service workaround. Signed-off-by: Ales Justin --- pom.xml | 11 +- .../io/strimzi/kafka/bridge/Application.java | 6 +- .../config/BridgeExecutorServiceFactory.java | 103 ------------------ 3 files changed, 6 insertions(+), 114 deletions(-) delete mode 100644 src/main/java/io/strimzi/kafka/bridge/config/BridgeExecutorServiceFactory.java diff --git a/pom.xml b/pom.xml index 1d7c7c582..04986ed92 100644 --- a/pom.xml +++ b/pom.xml @@ -315,6 +315,11 @@ opentelemetry-exporter-jaeger ${opentelemetry.version} + + io.vertx + vertx-opentelemetry + ${vertx.version} + io.grpc @@ -409,12 +414,6 @@ ${vertx.version} test - - io.vertx - vertx-opentelemetry - ${vertx.version} - test - org.mockito mockito-core diff --git a/src/main/java/io/strimzi/kafka/bridge/Application.java b/src/main/java/io/strimzi/kafka/bridge/Application.java index 9bcd62ef0..2249af991 100644 --- a/src/main/java/io/strimzi/kafka/bridge/Application.java +++ b/src/main/java/io/strimzi/kafka/bridge/Application.java @@ -8,7 +8,6 @@ import io.micrometer.core.instrument.MeterRegistry; import io.strimzi.kafka.bridge.amqp.AmqpBridge; import io.strimzi.kafka.bridge.config.BridgeConfig; -import io.strimzi.kafka.bridge.config.BridgeExecutorServiceFactory; import io.strimzi.kafka.bridge.http.HttpBridge; import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.vertx.config.ConfigRetriever; @@ -19,7 +18,6 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; -import io.vertx.core.impl.VertxBuilder; import io.vertx.core.json.JsonObject; import io.vertx.micrometer.Label; import io.vertx.micrometer.MetricsDomain; @@ -74,9 +72,7 @@ public static void main(String[] args) { vertxOptions.setMetricsOptions(metricsOptions()); jmxCollectorRegistry = getJmxCollectorRegistry(); } - VertxBuilder vertxBuilder = new VertxBuilder(vertxOptions) - .executorServiceFactory(new BridgeExecutorServiceFactory()); - Vertx vertx = vertxBuilder.init().vertx(); + Vertx vertx = Vertx.vertx(vertxOptions); // MeterRegistry default instance is just null if metrics are not enabled in the VertxOptions instance MeterRegistry meterRegistry = BackendRegistries.getDefaultNow(); MetricsReporter metricsReporter = new MetricsReporter(jmxCollectorRegistry, meterRegistry); diff --git a/src/main/java/io/strimzi/kafka/bridge/config/BridgeExecutorServiceFactory.java b/src/main/java/io/strimzi/kafka/bridge/config/BridgeExecutorServiceFactory.java deleted file mode 100644 index cfd520446..000000000 --- a/src/main/java/io/strimzi/kafka/bridge/config/BridgeExecutorServiceFactory.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ - -package io.strimzi.kafka.bridge.config; - -import io.strimzi.kafka.bridge.tracing.TracingUtil; -import io.vertx.core.spi.ExecutorServiceFactory; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * Custom Vertx ExecutorServiceFactory - delegate to tracing impl to provide one. - * Initially it could be NoopTracingHandle that provides it - before Application is fully initialized, - * then it should be actual tracing implementation - if there is one. - * Shutdown should be done OK, since tracing delegate will also delegate shutdown to original, - * or original itself will be used. - */ -public class BridgeExecutorServiceFactory implements ExecutorServiceFactory { - @Override - public ExecutorService createExecutor(ThreadFactory threadFactory, Integer concurrency, Integer maxConcurrency) { - ExecutorService original = ExecutorServiceFactory.INSTANCE.createExecutor(threadFactory, concurrency, maxConcurrency); - return new ExecutorService() { - private ExecutorService delegate() { - return TracingUtil.getTracing().adapt(original); - } - - @Override - public void shutdown() { - delegate().shutdown(); - } - - @Override - public List shutdownNow() { - return delegate().shutdownNow(); - } - - @Override - public boolean isShutdown() { - return delegate().isShutdown(); - } - - @Override - public boolean isTerminated() { - return delegate().isTerminated(); - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - return delegate().awaitTermination(timeout, unit); - } - - @Override - public Future submit(Callable task) { - return delegate().submit(task); - } - - @Override - public Future submit(Runnable task, T result) { - return delegate().submit(task, result); - } - - @Override - public Future submit(Runnable task) { - return delegate().submit(task); - } - - @Override - public List> invokeAll(Collection> tasks) throws InterruptedException { - return delegate().invokeAll(tasks); - } - - @Override - public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - return delegate().invokeAll(tasks, timeout, unit); - } - - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - return delegate().invokeAny(tasks); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return delegate().invokeAny(tasks, timeout, unit); - } - - @Override - public void execute(Runnable command) { - delegate().execute(command); - } - }; - } -}