Skip to content

Commit

Permalink
Upgrade libs, and consume test
Browse files Browse the repository at this point in the history
  • Loading branch information
alesj committed Jul 11, 2022
1 parent 36240b5 commit c7c1ec6
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 31 deletions.
4 changes: 2 additions & 2 deletions config/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
bridge.id=my-bridge
# uncomment one the following lines (bridge.tracing) to enable Jaeger tracing, check the documentation how to configure the tracer
# OpenTracing support
bridge.tracing=jaeger
#bridge.tracing=jaeger
# OpenTelemetry support
#bridge.tracing=opentelemetry
bridge.tracing=opentelemetry

#Apache Kafka common
kafka.bootstrap.servers=localhost:9092
Expand Down
27 changes: 14 additions & 13 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@
<jaeger.version>1.8.1</jaeger.version>
<opentracing.version>0.33.0</opentracing.version>
<opentracing-kafka-client.version>0.1.15</opentracing-kafka-client.version>
<opentelemetry.version>1.9.0-alpha</opentelemetry.version>
<opentelemetry-stable.version>1.9.0</opentelemetry-stable.version>
<grpc.version>1.44.0</grpc.version>
<opentelemetry.alpha-version>1.15.0-alpha</opentelemetry.alpha-version>
<opentelemetry.version>1.15.0</opentelemetry.version>
<grpc.version>1.47.0</grpc.version>
<micrometer.version>1.3.9</micrometer.version>
<jmx-prometheus-collector.version>0.12.0</jmx-prometheus-collector.version>
<prometheus-simpleclient.version>0.7.0</prometheus-simpleclient.version>
Expand Down Expand Up @@ -273,47 +273,47 @@
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
<version>${opentelemetry.version}</version>
<version>${opentelemetry.alpha-version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>${opentelemetry-stable.version}</version>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
<version>${opentelemetry-stable.version}</version>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>${opentelemetry.version}</version>
<version>${opentelemetry.alpha-version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<version>${opentelemetry-stable.version}</version>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-2.6</artifactId>
<version>${opentelemetry.version}</version>
<version>${opentelemetry.alpha-version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api</artifactId>
<version>${opentelemetry.version}</version>
<artifactId>opentelemetry-instrumentation-api-semconv</artifactId>
<version>${opentelemetry.alpha-version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-common</artifactId>
<version>${opentelemetry.version}</version>
<version>${opentelemetry.alpha-version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-jaeger</artifactId>
<version>${opentelemetry-stable.version}</version>
<version>${opentelemetry.version}</version>
</dependency>
<!-- Use gRPC as the transport -->
<dependency>
Expand Down Expand Up @@ -568,6 +568,7 @@
<!-- OpenTelemetry - used via classpath configuration -->
<ignoredUnusedDeclaredDependency>io.opentelemetry:opentelemetry-sdk-trace</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>io.opentelemetry.instrumentation:opentelemetry-kafka-clients-common</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>io.opentelemetry.instrumentation:opentelemetry-instrumentation-api</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>io.opentelemetry:opentelemetry-exporter-jaeger</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>io.grpc:grpc-netty-shaded</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

final class NoopTracingHandle implements TracingHandle {
@Override
public String envName() {
public String envServiceName() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ static void setCommonAttributes(SpanBuilder builder, RoutingContext routingConte
}

@Override
public String envName() {
public String envServiceName() {
return OPENTELEMETRY_SERVICE_NAME_ENV_KEY;
}

@Override
public String serviceName(BridgeConfig config) {
String serviceName = System.getenv(envName());
String serviceName = System.getenv(envServiceName());
if (serviceName == null) {
// legacy purpose, use previous JAEGER_SERVICE_NAME as OTEL_SERVICE_NAME (if not explicitly set)
serviceName = System.getenv(Configuration.JAEGER_SERVICE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ static void setCommonTags(Span span, RoutingContext routingContext) {
}

@Override
public String envName() {
public String envServiceName() {
return Configuration.JAEGER_SERVICE_NAME;
}

@Override
public String serviceName(BridgeConfig config) {
return System.getenv(envName());
return System.getenv(envServiceName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ public final class TracingConstants {
public static final String KAFKA_SERVICE = "kafka";

public static final String JAEGER = "jaeger";

public static final String JAEGER_OPENTRACING = JAEGER;
public static final String OPENTELEMETRY = "opentelemetry";

public static final String OPENTELEMETRY_SERVICE_NAME_ENV_KEY = "OTEL_SERVICE_NAME";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
*/
public interface TracingHandle {
/**
* Tracing env var name.
* Tracing env var service name.
*
* @return tracing env var name
* @return tracing env var service name
*/
String envName();
String envServiceName();

/**
* Extract service name from bridge confing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import java.util.Map;
import java.util.Properties;

import static io.strimzi.kafka.bridge.tracing.TracingConstants.JAEGER_OPENTRACING;
import static io.strimzi.kafka.bridge.tracing.TracingConstants.JAEGER;
import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY;

/**
Expand All @@ -31,7 +31,7 @@ public static TracingHandle getTracing() {

public static void initialize(BridgeConfig config) {
String tracingConfig = config.getTracing();
if (tracingConfig != null && (tracingConfig.equals(JAEGER_OPENTRACING) || tracingConfig.equals(OPENTELEMETRY))) {
if (tracingConfig != null && (tracingConfig.equals(JAEGER) || tracingConfig.equals(OPENTELEMETRY))) {
boolean isOpenTelemetry = OPENTELEMETRY.equals(tracingConfig);
TracingHandle instance = isOpenTelemetry ? new OpenTelemetryHandle() : new OpenTracingHandle();

Expand All @@ -45,7 +45,7 @@ public static void initialize(BridgeConfig config) {
instance.initialize();
tracing = instance;
} else {
log.error("Tracing config cannot be initialized because {} environment variable is not defined", instance.envName());
log.error("Tracing config cannot be initialized because {} environment variable is not defined", instance.envServiceName());
}
}
}
Expand Down
49 changes: 46 additions & 3 deletions src/test/java/io/strimzi/kafka/bridge/tracing/TracingTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.netty.handler.codec.http.HttpResponseStatus;
import io.strimzi.kafka.bridge.BridgeContentType;
import io.strimzi.kafka.bridge.http.services.ConsumerService;
import io.strimzi.kafka.bridge.http.services.ProducerService;
import io.strimzi.kafka.bridge.utils.Urls;
import io.vertx.core.AsyncResult;
Expand All @@ -20,8 +21,10 @@
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.ext.web.codec.BodyCodec;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -30,6 +33,9 @@
import org.slf4j.LoggerFactory;

import java.net.URL;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -72,10 +78,10 @@ public void setUp() {
protected abstract TracingOptions tracingOptions();

@Test
public void testSmoke(VertxTestContext context) {
public void testSmoke(VertxTestContext context) throws Exception {
Vertx vertx = Vertx.vertx(new VertxOptions().setTracingOptions(tracingOptions()));

WebClient client = WebClient.create(vertx, (WebClientOptions) new WebClientOptions()
WebClient client = WebClient.create(vertx, new WebClientOptions()
.setDefaultHost(Urls.BRIDGE_HOST)
.setDefaultPort(Urls.BRIDGE_PORT)
.setTracingPolicy(TracingPolicy.ALWAYS)
Expand All @@ -91,8 +97,45 @@ public void testSmoke(VertxTestContext context) {
JsonObject root = new JsonObject();
root.put("records", records);

String topicName = "mytopic";

ProducerService.getInstance(client)
.sendRecordsRequest("mytopic", root, BridgeContentType.KAFKA_JSON_JSON)
.sendRecordsRequest(topicName, root, BridgeContentType.KAFKA_JSON_JSON)
.sendJsonObject(root, verifyOK(context));

ConsumerService consumerService = ConsumerService.getInstance(client);

// create consumer
// subscribe to a topic

String consumerName = "my-consumer";
String groupId = UUID.randomUUID().toString();

JsonObject consumerJson = new JsonObject()
.put("name", consumerName)
.put("format", "json");

consumerService
.createConsumer(context, groupId, consumerJson)
.subscribeConsumer(context, groupId, consumerName, topicName);

CompletableFuture<Boolean> consume = new CompletableFuture<>();
// consume records
consumerService
.consumeRecordsRequest(groupId, consumerName, BridgeContentType.KAFKA_JSON_JSON)
.as(BodyCodec.jsonArray())
.send(ar -> {
context.verify(() -> {
assertThat(ar.succeeded(), CoreMatchers.is(true));
HttpResponse<JsonArray> response = ar.result();
assertThat(response.statusCode(), CoreMatchers.is(HttpResponseStatus.OK.code()));
});
consume.complete(true);
});

consume.get(60, TimeUnit.SECONDS);

// consumer deletion
consumerService.deleteConsumer(context, groupId, consumerName);
}
}

0 comments on commit c7c1ec6

Please sign in to comment.