Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OpenTelemetry support #633

Merged
merged 14 commits into from
Aug 11, 2022
5 changes: 5 additions & 0 deletions bin/kafka_bridge_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ fi
# Make sure that we use /dev/urandom
JAVA_OPTS="${JAVA_OPTS} -Dvertx.cacheDirBase=/tmp/vertx-cache -Djava.security.egd=file:/dev/./urandom"

# enabling OpenTelemetry with Jaeger by default
if [ -n "$OTEL_SERVICE_NAME" ] && [ -z "$OTEL_TRACES_EXPORTER" ]; then
export OTEL_TRACES_EXPORTER="jaeger"
fi
ppatierno marked this conversation as resolved.
Show resolved Hide resolved

exec java $JAVA_OPTS $KAFKA_BRIDGE_LOG4J_OPTS -classpath "${MYPATH}/../libs/*" io.strimzi.kafka.bridge.Application "$@"
5 changes: 4 additions & 1 deletion config/application.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#Bridge related settings
bridge.id=my-bridge
# uncomment the following line to enable Jaeger tracing, check the documentation how to configure the tracer
# uncomment one the following lines (bridge.tracing) to enable Jaeger tracing, check the documentation how to configure the tracer
# OpenTracing support
#bridge.tracing=jaeger
# OpenTelemetry support
#bridge.tracing=opentelemetry

#Apache Kafka common
kafka.bootstrap.servers=localhost:9092
Expand Down
53 changes: 52 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
<maven.compiler.target>1.8</maven.compiler.target>
<log4j.version>2.17.2</log4j.version>
<slf4j.version>1.7.21</slf4j.version>
<vertx.version>4.3.1</vertx.version>
<vertx.version>4.3.3</vertx.version>
<netty.version>4.1.77.Final</netty.version>
<kafka.version>3.2.0</kafka.version>
<qpid-proton.version>0.33.10</qpid-proton.version>
Expand All @@ -110,6 +110,8 @@
<jaeger.version>1.8.1</jaeger.version>
<opentracing.version>0.33.0</opentracing.version>
<opentracing-kafka-client.version>0.1.15</opentracing-kafka-client.version>
<opentelemetry.alpha-version>1.16.0-alpha</opentelemetry.alpha-version>
<opentelemetry.version>1.16.0</opentelemetry.version>
<micrometer.version>1.3.9</micrometer.version>
<jmx-prometheus-collector.version>0.12.0</jmx-prometheus-collector.version>
<prometheus-simpleclient.version>0.7.0</prometheus-simpleclient.version>
Expand Down Expand Up @@ -267,6 +269,46 @@
<artifactId>opentracing-kafka-client</artifactId>
<version>${opentracing-kafka-client.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
<version>${opentelemetry.alpha-version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
alesj marked this conversation as resolved.
Show resolved Hide resolved
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>${opentelemetry.alpha-version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-2.6</artifactId>
<version>${opentelemetry.alpha-version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api-semconv</artifactId>
<version>${opentelemetry.alpha-version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-jaeger</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-opentelemetry</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
Expand Down Expand Up @@ -349,6 +391,12 @@
<version>${vertx.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-opentracing</artifactId>
<version>${vertx.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down Expand Up @@ -498,6 +546,9 @@
<ignoredUnusedDeclaredDependency>org.apache.logging.log4j:log4j-slf4j-impl</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>io.jaegertracing:jaeger-client</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>org.yaml:snakeyaml</ignoredUnusedDeclaredDependency> <!-- CVE override -->
<ignoredUnusedDeclaredDependency>org.apache.tomcat.embed:tomcat-embed-core</ignoredUnusedDeclaredDependency> <!-- CVE override -->
<!-- OpenTelemetry - used via classpath configuration -->
<ignoredUnusedDeclaredDependency>io.opentelemetry:opentelemetry-exporter-jaeger</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
</configuration>
</execution>
Expand Down
40 changes: 16 additions & 24 deletions src/main/java/io/strimzi/kafka/bridge/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@

package io.strimzi.kafka.bridge;

import io.jaegertracing.Configuration;
import io.micrometer.core.instrument.MeterRegistry;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import io.strimzi.kafka.bridge.amqp.AmqpBridge;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.http.HttpBridge;
import io.strimzi.kafka.bridge.tracing.TracingUtil;
import io.vertx.config.ConfigRetriever;
import io.vertx.config.ConfigRetrieverOptions;
import io.vertx.config.ConfigStoreOptions;
Expand All @@ -30,6 +28,7 @@
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -67,7 +66,7 @@ public static void main(String[] args) {
try {
VertxOptions vertxOptions = new VertxOptions();
JmxCollectorRegistry jmxCollectorRegistry = null;
if (Boolean.valueOf(System.getenv(KAFKA_BRIDGE_METRICS_ENABLED))) {
if (Boolean.parseBoolean(System.getenv(KAFKA_BRIDGE_METRICS_ENABLED))) {
log.info("Metrics enabled and exposed on the /metrics endpoint");
// setup Micrometer metrics options
vertxOptions.setMetricsOptions(metricsOptions());
Expand All @@ -82,13 +81,13 @@ public static void main(String[] args) {
CommandLine commandLine = new DefaultParser().parse(generateOptions(), args);

ConfigStoreOptions fileStore = new ConfigStoreOptions()
.setType("file")
.setFormat("properties")
.setConfig(new JsonObject().put("path", absoluteFilePath(commandLine.getOptionValue("config-file"))).put("raw-data", true));
.setType("file")
.setFormat("properties")
.setConfig(new JsonObject().put("path", absoluteFilePath(commandLine.getOptionValue("config-file"))).put("raw-data", true));

ConfigStoreOptions envStore = new ConfigStoreOptions()
.setType("env")
.setConfig(new JsonObject().put("raw-data", true));
.setType("env")
.setConfig(new JsonObject().put("raw-data", true));

ConfigRetrieverOptions options = new ConfigRetrieverOptions()
.addStore(fileStore)
Expand Down Expand Up @@ -126,43 +125,36 @@ public static void main(String[] args) {
}
}

// register tracing - if set, etc
TracingUtil.initialize(bridgeConfig);

// when HTTP protocol is enabled, it handles healthy/ready/metrics endpoints as well,
// so no need for a standalone embedded HTTP server
if (!bridgeConfig.getHttpConfig().isEnabled()) {
EmbeddedHttpServer embeddedHttpServer =
new EmbeddedHttpServer(vertx, healthChecker, metricsReporter, embeddedHttpServerPort);
embeddedHttpServer.start();
}

// register OpenTracing Jaeger tracer
if ("jaeger".equals(bridgeConfig.getTracing())) {
if (config.get(Configuration.JAEGER_SERVICE_NAME) != null) {
Tracer tracer = Configuration.fromEnv().getTracer();
GlobalTracer.registerIfAbsent(tracer);
} else {
log.error("Jaeger tracing cannot be initialized because {} environment variable is not defined", Configuration.JAEGER_SERVICE_NAME);
}
}
}
});
} else {
log.error("Error starting the bridge", ar.cause());
System.exit(1);
}
});
} catch (Exception ex) {
log.error("Error starting the bridge", ex);
} catch (RuntimeException | MalformedObjectNameException | IOException | ParseException e) {
log.error("Error starting the bridge", e);
System.exit(1);
}
}

/**
* Set up the Vert.x metrics options
*
*
* @return instance of the MicrometerMetricsOptions on Vert.x
*/
private static MicrometerMetricsOptions metricsOptions() {
Set<String> set = new HashSet();
Set<String> set = new HashSet<>();
set.add(MetricsDomain.NAMED_POOLS.name());
set.add(MetricsDomain.VERTICLES.name());
return new MicrometerMetricsOptions()
Expand Down Expand Up @@ -218,7 +210,7 @@ private static Future<HttpBridge> deployHttpBridge(Vertx vertx, BridgeConfig bri

if (bridgeConfig.getHttpConfig().isEnabled()) {
HttpBridge httpBridge = new HttpBridge(bridgeConfig, metricsReporter);

vertx.deployVerticle(httpBridge, done -> {
if (done.succeeded()) {
log.info("HTTP verticle instance deployed [{}]", done.result());
Expand Down
16 changes: 9 additions & 7 deletions src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ public MetricsReporter(JmxCollectorRegistry jmxCollectorRegistry, MeterRegistry
this.jmxCollectorRegistry = jmxCollectorRegistry;
this.meterRegistry = meterRegistry;
if (this.meterRegistry instanceof PrometheusMeterRegistry) {
this.meterRegistry.config().namingConvention(new PrometheusNamingConvention() {
@Override
public String name(String name, Meter.Type type, String baseUnit) {
String metricName = name.startsWith("vertx.") ? name.replace("vertx.", "strimzi.bridge.") : name;
return super.name(metricName, type, baseUnit);
}
});
this.meterRegistry.config().namingConvention(new MetricsNamingConvention());
alesj marked this conversation as resolved.
Show resolved Hide resolved
}
}

private static class MetricsNamingConvention extends PrometheusNamingConvention {
@Override
public String name(String name, Meter.Type type, String baseUnit) {
String metricName = name.startsWith("vertx.") ? name.replace("vertx.", "strimzi.bridge.") : name;
return super.name(metricName, type, baseUnit);
}
}

Expand Down
8 changes: 2 additions & 6 deletions src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package io.strimzi.kafka.bridge;

import io.opentracing.contrib.kafka.TracingConsumerInterceptor;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.config.KafkaConfig;
import io.strimzi.kafka.bridge.tracker.OffsetTracker;
Expand Down Expand Up @@ -167,9 +166,6 @@ protected void initConsumer(boolean shouldAttachBatchHandler, Properties config)
props.putAll(kafkaConfig.getConfig());
props.putAll(kafkaConfig.getConsumerConfig().getConfig());
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
if (this.bridgeConfig.getTracing() != null) {
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
}

if (config != null)
props.putAll(config);
Expand Down Expand Up @@ -200,7 +196,7 @@ protected void subscribe(boolean shouldAttachHandler) {
this.subscribed = true;
this.setPartitionsAssignmentHandlers();

Set<String> topics = this.topicSubscriptions.stream().map(ts -> ts.getTopic()).collect(Collectors.toSet());
Set<String> topics = this.topicSubscriptions.stream().map(SinkTopicSubscription::getTopic).collect(Collectors.toSet());
this.consumer.subscribe(topics, this::subscribeHandler);
}

Expand Down Expand Up @@ -710,7 +706,7 @@ protected void consume(Handler<AsyncResult<KafkaConsumerRecords<K, V>>> consumeH
this.consumer.poll(Duration.ofMillis(this.pollTimeOut), consumeHandler);
}

protected void commit(Map<TopicPartition, io.vertx.kafka.client.consumer.OffsetAndMetadata> offsetsData,
protected void commit(Map<TopicPartition, io.vertx.kafka.client.consumer.OffsetAndMetadata> offsetsData,
Handler<AsyncResult<Map<TopicPartition, io.vertx.kafka.client.consumer.OffsetAndMetadata>>> commitOffsetsHandler) {
this.consumer.commit(offsetsData, commitOffsetsHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

package io.strimzi.kafka.bridge;

import io.opentracing.contrib.kafka.TracingProducerInterceptor;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.config.KafkaConfig;
import io.strimzi.kafka.bridge.tracing.TracingHandle;
import io.strimzi.kafka.bridge.tracing.TracingUtil;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -103,9 +104,9 @@ public void open() {
Properties props = new Properties();
props.putAll(kafkaConfig.getConfig());
props.putAll(kafkaConfig.getProducerConfig().getConfig());
if (this.bridgeConfig.getTracing() != null) {
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
}

TracingHandle tracing = TracingUtil.getTracing();
tracing.addTracingPropsToProducerConfig(props);

this.producerUnsettledMode = KafkaProducer.create(this.vertx, props, this.keySerializer, this.valueSerializer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.kafka.common.serialization.Deserializer;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Map;

Expand All @@ -26,7 +27,7 @@ public T deserialize(String topic, byte[] data) {

try (ByteArrayInputStream b = new ByteArrayInputStream(data); ObjectInputStream o = new ObjectInputStream(b)) {
return (T) o.readObject();
} catch (Exception e) {
} catch (IOException | ClassNotFoundException e) {
throw new SerializationException("Error when deserializing", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.Map;

Expand All @@ -20,7 +21,7 @@ public void configure(Map<String, ?> configs, boolean isKey) {

}

@SuppressFBWarnings("PZLA_PREFER_ZERO_LENGTH_ARRAYS")
@SuppressFBWarnings({"PZLA_PREFER_ZERO_LENGTH_ARRAYS"})
@Override
public byte[] serialize(String topic, T data) {

Expand All @@ -30,7 +31,7 @@ public byte[] serialize(String topic, T data) {
try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b)) {
o.writeObject(data);
return b.toByteArray();
} catch (Exception e) {
} catch (IOException e) {
throw new SerializationException("Error when serializing", e);
}
}
Expand Down
Loading