Skip to content

Commit

Permalink
Add OpenTelemetry support.
Browse files Browse the repository at this point in the history
Signed-off-by: Ales Justin <[email protected]>
  • Loading branch information
alesj committed Jan 31, 2022
1 parent b6439d7 commit 494dc8b
Show file tree
Hide file tree
Showing 22 changed files with 851 additions and 154 deletions.
5 changes: 5 additions & 0 deletions bin/kafka_bridge_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ fi
# Make sure that we use /dev/urandom
JAVA_OPTS="${JAVA_OPTS} -Dvertx.cacheDirBase=/tmp/vertx-cache -Djava.security.egd=file:/dev/./urandom"

# enabling OpenTelemetry with Jaeger
if [ -n "$OTEL_SERVICE_NAME" ]; then
export OTEL_TRACES_EXPORTER="jaeger"
fi

exec java $JAVA_OPTS $KAFKA_BRIDGE_LOG4J_OPTS -classpath "${MYPATH}/../libs/*" io.strimzi.kafka.bridge.Application "$@"
6 changes: 5 additions & 1 deletion config/application.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
#Bridge related settings
bridge.id=my-bridge
# uncomment the following line to enable Jaeger tracing, check the documentation how to configure the tracer
# uncomment one the following lines (bridge.tracing) to enable Jaeger tracing, check the documentation how to configure the tracer
# OpenTracing support
#bridge.tracing=jaeger
# OpenTelemetry support
#bridge.tracing=jaeger-otel
#bridge.tracing.service-name=strimzi-kafka-bridge

#Apache Kafka common
kafka.bootstrap.servers=localhost:9092
Expand Down
45 changes: 45 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@
<jaeger.version>1.6.0</jaeger.version>
<opentracing.version>0.33.0</opentracing.version>
<opentracing-kafka-client.version>0.1.15</opentracing-kafka-client.version>
<opentelemetry.version>1.7.0-alpha</opentelemetry.version>
<opentelemetry-stable.version>1.7.0</opentelemetry-stable.version>
<grpc.version>1.41.0</grpc.version>
<micrometer.version>1.3.9</micrometer.version>
<jmx-prometheus-collector.version>0.12.0</jmx-prometheus-collector.version>
<commons-cli.version>1.4</commons-cli.version>
Expand Down Expand Up @@ -203,6 +206,42 @@
<artifactId>opentracing-kafka-client</artifactId>
<version>${opentracing-kafka-client.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<version>${opentelemetry-stable.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-2.6</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-common</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-jaeger</artifactId>
<version>${opentelemetry-stable.version}</version>
</dependency>
<!-- Use gRPC as the transport -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
Expand Down Expand Up @@ -295,6 +334,12 @@
<version>${vertx.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-opentelemetry</artifactId>
<version>${vertx.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
46 changes: 19 additions & 27 deletions src/main/java/io/strimzi/kafka/bridge/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@

package io.strimzi.kafka.bridge;

import io.jaegertracing.Configuration;
import io.micrometer.core.instrument.MeterRegistry;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import io.strimzi.kafka.bridge.amqp.AmqpBridge;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.http.HttpBridge;
import io.strimzi.kafka.bridge.tracing.TracingUtil;
import io.vertx.config.ConfigRetriever;
import io.vertx.config.ConfigRetrieverOptions;
import io.vertx.config.ConfigStoreOptions;
Expand All @@ -30,6 +28,7 @@
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -67,7 +66,7 @@ public static void main(String[] args) {
try {
VertxOptions vertxOptions = new VertxOptions();
JmxCollectorRegistry jmxCollectorRegistry = null;
if (Boolean.valueOf(System.getenv(KAFKA_BRIDGE_METRICS_ENABLED))) {
if (Boolean.parseBoolean(System.getenv(KAFKA_BRIDGE_METRICS_ENABLED))) {
log.info("Metrics enabled and exposed on the /metrics endpoint");
// setup Micrometer metrics options
vertxOptions.setMetricsOptions(metricsOptions());
Expand All @@ -82,17 +81,17 @@ public static void main(String[] args) {
CommandLine commandLine = new DefaultParser().parse(generateOptions(), args);

ConfigStoreOptions fileStore = new ConfigStoreOptions()
.setType("file")
.setFormat("properties")
.setConfig(new JsonObject().put("path", absoluteFilePath(commandLine.getOptionValue("config-file"))).put("raw-data", true));
.setType("file")
.setFormat("properties")
.setConfig(new JsonObject().put("path", absoluteFilePath(commandLine.getOptionValue("config-file"))).put("raw-data", true));

ConfigStoreOptions envStore = new ConfigStoreOptions()
.setType("env")
.setConfig(new JsonObject().put("raw-data", true));
.setType("env")
.setConfig(new JsonObject().put("raw-data", true));

ConfigRetrieverOptions options = new ConfigRetrieverOptions()
.addStore(fileStore)
.addStore(envStore);
.addStore(fileStore)
.addStore(envStore);

ConfigRetriever retriever = ConfigRetriever.create(vertx, options);
retriever.getConfig(ar -> {
Expand Down Expand Up @@ -126,43 +125,36 @@ public static void main(String[] args) {
}
}

// register Jaeger tracer - if set, etc
TracingUtil.initialize(bridgeConfig);

// when HTTP protocol is enabled, it handles healthy/ready/metrics endpoints as well,
// so no need for a standalone embedded HTTP server
if (!bridgeConfig.getHttpConfig().isEnabled()) {
EmbeddedHttpServer embeddedHttpServer =
new EmbeddedHttpServer(vertx, healthChecker, metricsReporter, embeddedHttpServerPort);
new EmbeddedHttpServer(vertx, healthChecker, metricsReporter, embeddedHttpServerPort);
embeddedHttpServer.start();
}

// register OpenTracing Jaeger tracer
if ("jaeger".equals(bridgeConfig.getTracing())) {
if (config.get(Configuration.JAEGER_SERVICE_NAME) != null) {
Tracer tracer = Configuration.fromEnv().getTracer();
GlobalTracer.registerIfAbsent(tracer);
} else {
log.error("Jaeger tracing cannot be initialized because {} environment variable is not defined", Configuration.JAEGER_SERVICE_NAME);
}
}
}
});
} else {
log.error("Error starting the bridge", ar.cause());
System.exit(1);
}
});
} catch (Exception ex) {
log.error("Error starting the bridge", ex);
} catch (RuntimeException | MalformedObjectNameException | IOException | ParseException e) {
log.error("Error starting the bridge", e);
System.exit(1);
}
}

/**
* Set up the Vert.x metrics options
*
*
* @return instance of the MicrometerMetricsOptions on Vert.x
*/
private static MicrometerMetricsOptions metricsOptions() {
Set<String> set = new HashSet();
Set<String> set = new HashSet<>();
set.add(MetricsDomain.NAMED_POOLS.name());
set.add(MetricsDomain.VERTICLES.name());
return new MicrometerMetricsOptions()
Expand Down Expand Up @@ -218,7 +210,7 @@ private static Future<HttpBridge> deployHttpBridge(Vertx vertx, BridgeConfig bri

if (bridgeConfig.getHttpConfig().isEnabled()) {
HttpBridge httpBridge = new HttpBridge(bridgeConfig, metricsReporter);

vertx.deployVerticle(httpBridge, done -> {
if (done.succeeded()) {
log.info("HTTP verticle instance deployed [{}]", done.result());
Expand Down
16 changes: 9 additions & 7 deletions src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ public MetricsReporter(JmxCollectorRegistry jmxCollectorRegistry, MeterRegistry
this.jmxCollectorRegistry = jmxCollectorRegistry;
this.meterRegistry = meterRegistry;
if (this.meterRegistry instanceof PrometheusMeterRegistry) {
this.meterRegistry.config().namingConvention(new PrometheusNamingConvention() {
@Override
public String name(String name, Meter.Type type, String baseUnit) {
String metricName = name.startsWith("vertx.") ? name.replace("vertx.", "strimzi.bridge.") : name;
return super.name(metricName, type, baseUnit);
}
});
this.meterRegistry.config().namingConvention(new MetricsNamingConvention());
}
}

private static class MetricsNamingConvention extends PrometheusNamingConvention {
@Override
public String name(String name, Meter.Type type, String baseUnit) {
String metricName = name.startsWith("vertx.") ? name.replace("vertx.", "strimzi.bridge.") : name;
return super.name(metricName, type, baseUnit);
}
}

Expand Down
13 changes: 7 additions & 6 deletions src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

package io.strimzi.kafka.bridge;

import io.opentracing.contrib.kafka.TracingConsumerInterceptor;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.config.KafkaConfig;
import io.strimzi.kafka.bridge.tracing.TracingHandle;
import io.strimzi.kafka.bridge.tracing.TracingUtil;
import io.strimzi.kafka.bridge.tracker.OffsetTracker;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
Expand Down Expand Up @@ -167,9 +168,9 @@ protected void initConsumer(boolean shouldAttachBatchHandler, Properties config)
props.putAll(kafkaConfig.getConfig());
props.putAll(kafkaConfig.getConsumerConfig().getConfig());
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
if (this.bridgeConfig.getTracing() != null) {
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
}

TracingHandle tracing = TracingUtil.getTracing();
tracing.kafkaConsumerConfig(props);

if (config != null)
props.putAll(config);
Expand Down Expand Up @@ -200,7 +201,7 @@ protected void subscribe(boolean shouldAttachHandler) {
this.subscribed = true;
this.setPartitionsAssignmentHandlers();

Set<String> topics = this.topicSubscriptions.stream().map(ts -> ts.getTopic()).collect(Collectors.toSet());
Set<String> topics = this.topicSubscriptions.stream().map(SinkTopicSubscription::getTopic).collect(Collectors.toSet());
this.consumer.subscribe(topics, this::subscribeHandler);
}

Expand Down Expand Up @@ -710,7 +711,7 @@ protected void consume(Handler<AsyncResult<KafkaConsumerRecords<K, V>>> consumeH
this.consumer.poll(Duration.ofMillis(this.pollTimeOut), consumeHandler);
}

protected void commit(Map<TopicPartition, io.vertx.kafka.client.consumer.OffsetAndMetadata> offsetsData,
protected void commit(Map<TopicPartition, io.vertx.kafka.client.consumer.OffsetAndMetadata> offsetsData,
Handler<AsyncResult<Map<TopicPartition, io.vertx.kafka.client.consumer.OffsetAndMetadata>>> commitOffsetsHandler) {
this.consumer.commit(offsetsData, commitOffsetsHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

package io.strimzi.kafka.bridge;

import io.opentracing.contrib.kafka.TracingProducerInterceptor;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.config.KafkaConfig;
import io.strimzi.kafka.bridge.tracing.TracingHandle;
import io.strimzi.kafka.bridge.tracing.TracingUtil;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -103,9 +104,9 @@ public void open() {
Properties props = new Properties();
props.putAll(kafkaConfig.getConfig());
props.putAll(kafkaConfig.getProducerConfig().getConfig());
if (this.bridgeConfig.getTracing() != null) {
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
}

TracingHandle tracing = TracingUtil.getTracing();
tracing.kafkaProducerConfig(props);

this.producerUnsettledMode = KafkaProducer.create(this.vertx, props, this.keySerializer, this.valueSerializer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class BridgeConfig extends AbstractConfig {

public static final String BRIDGE_ID = BRIDGE_CONFIG_PREFIX + "id";
public static final String TRACING_TYPE = BRIDGE_CONFIG_PREFIX + "tracing";
public static final String TRACING_SERVICE_NAME_TYPE = TRACING_TYPE + ".service-name";

private KafkaConfig kafkaConfig;
private AmqpConfig amqpConfig;
Expand Down Expand Up @@ -103,4 +104,12 @@ public String getTracing() {
return config.get(BridgeConfig.TRACING_TYPE).toString();
}
}

public String getTracingServiceName() {
if (config.get(BridgeConfig.TRACING_SERVICE_NAME_TYPE) == null) {
return null;
} else {
return config.get(BridgeConfig.TRACING_SERVICE_NAME_TYPE).toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.kafka.common.serialization.Deserializer;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Map;

Expand All @@ -26,7 +27,7 @@ public T deserialize(String topic, byte[] data) {

try (ByteArrayInputStream b = new ByteArrayInputStream(data); ObjectInputStream o = new ObjectInputStream(b)) {
return (T) o.readObject();
} catch (Exception e) {
} catch (IOException | ClassNotFoundException e) {
throw new SerializationException("Error when deserializing", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.Map;

Expand All @@ -20,7 +21,7 @@ public void configure(Map<String, ?> configs, boolean isKey) {

}

@SuppressFBWarnings("PZLA_PREFER_ZERO_LENGTH_ARRAYS")
@SuppressFBWarnings({"PZLA_PREFER_ZERO_LENGTH_ARRAYS"})
@Override
public byte[] serialize(String topic, T data) {

Expand All @@ -30,7 +31,7 @@ public byte[] serialize(String topic, T data) {
try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b)) {
o.writeObject(data);
return b.toByteArray();
} catch (Exception e) {
} catch (IOException e) {
throw new SerializationException("Error when serializing", e);
}
}
Expand Down
Loading

0 comments on commit 494dc8b

Please sign in to comment.