From 8da355305ae409a25dd58652162a6df3d7580a36 Mon Sep 17 00:00:00 2001 From: Bruno Baptista Date: Tue, 16 Jul 2024 16:22:41 +0100 Subject: [PATCH] Sampler to be used by the Azure endpoints (#195) * Sampler to be used by the Azure endpoints * Fix local variable * Sampler to be used by the Azure endpoints * Filter Azure URLs * Bump azure-monitor-opentelemetry-exporter * Rollback Azure exporter bump * Add test about the issue and fix the implemenation * Don't instrument stats beat endpoints --------- Co-authored-by: Jean Bisutti --- .../deployment/AzureExporterProcessor.java | 18 +++++ .../integration-tests/pom.xml | 6 ++ .../opentelemetry/exporter/it/AzureTest.java | 38 +++++++---- .../azure/runtime/AzureEndpointSampler.java | 67 +++++++++++++++++++ .../exporter/azure/runtime/AzureRecorder.java | 52 +++++++++++++- 5 files changed, 167 insertions(+), 14 deletions(-) create mode 100644 quarkus-opentelemetry-exporter-azure/runtime/src/main/java/io/quarkiverse/opentelemetry/exporter/azure/runtime/AzureEndpointSampler.java diff --git a/quarkus-opentelemetry-exporter-azure/deployment/src/main/java/io/quarkiverse/opentelemetry/exporter/azure/deployment/AzureExporterProcessor.java b/quarkus-opentelemetry-exporter-azure/deployment/src/main/java/io/quarkiverse/opentelemetry/exporter/azure/deployment/AzureExporterProcessor.java index 16a4c98..3757cec 100644 --- a/quarkus-opentelemetry-exporter-azure/deployment/src/main/java/io/quarkiverse/opentelemetry/exporter/azure/deployment/AzureExporterProcessor.java +++ b/quarkus-opentelemetry-exporter-azure/deployment/src/main/java/io/quarkiverse/opentelemetry/exporter/azure/deployment/AzureExporterProcessor.java @@ -20,6 +20,8 @@ import io.netty.handler.ssl.OpenSsl; import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.quarkiverse.opentelemetry.exporter.azure.runtime.AzureEndpointSampler; import io.quarkiverse.opentelemetry.exporter.azure.runtime.AzureExporterBuildConfig; import io.quarkiverse.opentelemetry.exporter.azure.runtime.AzureExporterQuarkusRuntimeConfig; import io.quarkiverse.opentelemetry.exporter.azure.runtime.AzureExporterRuntimeConfig; @@ -148,4 +150,20 @@ SyntheticBeanBuildItem installBatchSpanProcessorForAzure(AzureRecorder recorder, .createWith(recorder.createSpanProcessorForAzure(runtimeConfig, quarkusRuntimeConfig)) .done(); } + + @BuildStep + @Record(ExecutionTime.RUNTIME_INIT) + SyntheticBeanBuildItem installAzureEndpointSampler(AzureRecorder recorder, + AzureExporterRuntimeConfig runtimeConfig, + AzureExporterQuarkusRuntimeConfig quarkusRuntimeConfig) { + return SyntheticBeanBuildItem.configure(AzureEndpointSampler.class) + .types(Sampler.class) + .setRuntimeInit() + .scope(Singleton.class) + .unremovable() + .addInjectionPoint(ParameterizedType.create(DotName.createSimple(Instance.class), + new Type[] { ClassType.create(DotName.createSimple(Sampler.class.getName())) }, null)) + .createWith(recorder.createSampler(runtimeConfig, quarkusRuntimeConfig)) + .done(); + } } diff --git a/quarkus-opentelemetry-exporter-azure/integration-tests/pom.xml b/quarkus-opentelemetry-exporter-azure/integration-tests/pom.xml index cc78ed4..228128b 100644 --- a/quarkus-opentelemetry-exporter-azure/integration-tests/pom.xml +++ b/quarkus-opentelemetry-exporter-azure/integration-tests/pom.xml @@ -56,6 +56,12 @@ ${quarkus-wiremock-test.version} test + + org.assertj + assertj-core + ${assertj-core.version} + test + diff --git a/quarkus-opentelemetry-exporter-azure/integration-tests/src/test/java/io/quarkiverse/opentelemetry/exporter/it/AzureTest.java b/quarkus-opentelemetry-exporter-azure/integration-tests/src/test/java/io/quarkiverse/opentelemetry/exporter/it/AzureTest.java index 30aec22..7a98b99 100644 --- a/quarkus-opentelemetry-exporter-azure/integration-tests/src/test/java/io/quarkiverse/opentelemetry/exporter/it/AzureTest.java +++ b/quarkus-opentelemetry-exporter-azure/integration-tests/src/test/java/io/quarkiverse/opentelemetry/exporter/it/AzureTest.java @@ -2,18 +2,22 @@ import static com.github.tomakehurst.wiremock.client.WireMock.*; import static io.restassured.RestAssured.given; +import static org.assertj.core.api.Assertions.assertThat; import static org.hamcrest.CoreMatchers.equalTo; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; import java.time.Duration; +import java.util.List; +import java.util.Optional; import java.util.concurrent.Callable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.testcontainers.shaded.org.awaitility.Awaitility; import com.github.tomakehurst.wiremock.WireMockServer; import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.github.tomakehurst.wiremock.verification.LoggedRequest; import io.quarkus.test.junit.QuarkusTest; @@ -36,7 +40,7 @@ public void stopWireMock() { } @Test - void connectionTest() { + void connectionTest() throws InterruptedException { wireMockServer.stubFor( any(urlMatching(".*")) @@ -50,19 +54,27 @@ void connectionTest() { .statusCode(200) .body("message", equalTo("Direct trace")); - Awaitility.await() + await() .atMost(Duration.ofSeconds(10)) - .until(azureExportIsDone(wireMockServer)); + .until(telemetryDataContainTheHttpCall(wireMockServer)); + + // Non regression test for https://github.com/Azure/azure-sdk-for-java/issues/41040 + Thread.sleep(10_000); + List telemetryExport = wireMockServer.findAll(postRequestedFor(urlEqualTo("/export/v2.1/track"))); + List requestBodies = telemetryExport + .stream() + .map(request -> new String(request.getBody())).toList(); + requestBodies.stream().forEach(System.out::println); // It's convenient to print the telemetry data on the console to spot potential issues + Optional telemetryDataExport = requestBodies.stream() + .filter(body -> body.contains("RemoteDependency") && body.contains("POST /export/v2.1/track")) + .findAny(); + assertThat(telemetryDataExport).as("Telemetry export request should not appear as a dependency.").isEmpty(); } - private static Callable azureExportIsDone(WireMockServer wireMockServer) { - return () -> { - try { - wireMockServer.verify(1, postRequestedFor(urlEqualTo("/export/v2.1/track"))); - return Boolean.TRUE; - } catch (AssertionError e) { - return Boolean.FALSE; - } - }; + private static Callable telemetryDataContainTheHttpCall(WireMockServer wireMockServer) { + return () -> wireMockServer.findAll(postRequestedFor(urlEqualTo("/export/v2.1/track"))) + .stream() + .map(request -> new String(request.getBody())) + .anyMatch(body -> body.contains("Request") && body.contains("GET /direct")); } } diff --git a/quarkus-opentelemetry-exporter-azure/runtime/src/main/java/io/quarkiverse/opentelemetry/exporter/azure/runtime/AzureEndpointSampler.java b/quarkus-opentelemetry-exporter-azure/runtime/src/main/java/io/quarkiverse/opentelemetry/exporter/azure/runtime/AzureEndpointSampler.java new file mode 100644 index 0000000..55108d1 --- /dev/null +++ b/quarkus-opentelemetry-exporter-azure/runtime/src/main/java/io/quarkiverse/opentelemetry/exporter/azure/runtime/AzureEndpointSampler.java @@ -0,0 +1,67 @@ +package io.quarkiverse.opentelemetry.exporter.azure.runtime; + +import java.util.List; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.sdk.trace.samplers.SamplingResult; +import io.opentelemetry.semconv.SemanticAttributes; + +/** + * Sampler that drops spans based on the target of the request. + * Inspired by {@link io.quarkus.opentelemetry.runtime.tracing.DropTargetsSampler} + */ +public class AzureEndpointSampler implements Sampler { + + private final List dropTargets; + + public AzureEndpointSampler(final List dropTargets) { + this.dropTargets = dropTargets; + } + + @Override + public SamplingResult shouldSample(Context context, + String s, + String s1, + SpanKind spanKind, + Attributes attributes, + List list) { + if (spanKind.equals(SpanKind.CLIENT)) { + String target = attributes.get(SemanticAttributes.HTTP_URL); + if (target == null) { + target = attributes.get(SemanticAttributes.URL_FULL); + } + if (shouldDrop(target)) { + return SamplingResult.drop(); + } + } + return Sampler.alwaysOn().shouldSample(context, s, s1, spanKind, attributes, list); + } + + @Override + public String getDescription() { + return "azure-endpoint-sampler"; + } + + private boolean shouldDrop(String target) { + if ((target == null) || target.isEmpty()) { + return false; + } + if (safeContains(target)) { // check exact match + return true; + } + if (target.charAt(target.length() - 1) == '/') { // check if the path without the ending slash matched + if (safeContains(target.substring(0, target.length() - 1))) { + return true; + } + } + return false; + } + + private boolean safeContains(String target) { + return dropTargets.contains(target); + } +} diff --git a/quarkus-opentelemetry-exporter-azure/runtime/src/main/java/io/quarkiverse/opentelemetry/exporter/azure/runtime/AzureRecorder.java b/quarkus-opentelemetry-exporter-azure/runtime/src/main/java/io/quarkiverse/opentelemetry/exporter/azure/runtime/AzureRecorder.java index f8000dc..42a8b02 100644 --- a/quarkus-opentelemetry-exporter-azure/runtime/src/main/java/io/quarkiverse/opentelemetry/exporter/azure/runtime/AzureRecorder.java +++ b/quarkus-opentelemetry-exporter-azure/runtime/src/main/java/io/quarkiverse/opentelemetry/exporter/azure/runtime/AzureRecorder.java @@ -1,7 +1,8 @@ package io.quarkiverse.opentelemetry.exporter.azure.runtime; -import java.util.Optional; +import java.util.*; import java.util.function.Function; +import java.util.regex.Pattern; import com.azure.monitor.opentelemetry.exporter.AzureMonitorExporterBuilder; @@ -14,6 +15,8 @@ @Recorder public class AzureRecorder { + public static final Pattern SEMI_COLON_PATTERN = Pattern.compile(";"); + public Function, LateBoundSpanProcessor> createSpanProcessorForAzure( AzureExporterRuntimeConfig runtimeConfig, AzureExporterQuarkusRuntimeConfig quarkusRuntimeConfig) { return new Function<>() { @@ -32,6 +35,43 @@ public LateBoundSpanProcessor apply(SyntheticCreationalContext, AzureEndpointSampler> createSampler( + AzureExporterRuntimeConfig runtimeConfig, AzureExporterQuarkusRuntimeConfig quarkusRuntimeConfig) { + return new Function<>() { + @Override + public AzureEndpointSampler apply(SyntheticCreationalContext context) { + List ingestionUrls = findIngestionUrls(runtimeConfig, quarkusRuntimeConfig); + List statsBeatUrls = Arrays.asList("https://westeurope-5.in.applicationinsights.azure.com/", + "https://westus-0.in.applicationinsights.azure.com/"); + List dropTargets = new ArrayList<>(); + dropTargets.addAll(addTrackPartInUrl(ingestionUrls)); + dropTargets.addAll(addTrackPartInUrl(statsBeatUrls)); + return new AzureEndpointSampler(dropTargets); + } + }; + } + + private static List findIngestionUrls(AzureExporterRuntimeConfig runtimeConfig, + AzureExporterQuarkusRuntimeConfig quarkusRuntimeConfig) { + String connectionString = findConnectionString(runtimeConfig, quarkusRuntimeConfig); + Optional ingestionEndpoint = extractIngestionEndpointFrom(connectionString); + if (ingestionEndpoint.isPresent()) { + return Collections.singletonList(ingestionEndpoint.get()); + } + return Arrays.asList("https://dc.services.visualstudio.com/", + "https://rt.services.visualstudio.com/"); + } + + private static Optional extractIngestionEndpointFrom(String connectionString) { + Optional ingestionEndpointInConnectionString = SEMI_COLON_PATTERN.splitAsStream(connectionString) + .filter(element -> element.startsWith("IngestionEndpoint=")).findFirst(); + if (ingestionEndpointInConnectionString.isEmpty()) { + return Optional.empty(); + } + return ingestionEndpointInConnectionString + .map(ingestionPartOfConnectionString -> ingestionPartOfConnectionString.replaceAll("IngestionEndpoint=", "")); + } + private static String findConnectionString(AzureExporterRuntimeConfig runtimeConfig, AzureExporterQuarkusRuntimeConfig quarkusRuntimeConfig) { Optional azureConnectionString = runtimeConfig.connectionString(); @@ -41,4 +81,14 @@ private static String findConnectionString(AzureExporterRuntimeConfig runtimeCon return quarkusRuntimeConfig.connectionString() .orElseThrow(() -> new IllegalStateException("Azure connection string is missing")); } + + private static List addTrackPartInUrl(List ingestionUrls) { + return ingestionUrls.stream() + .map(ingestionUrl -> { + if (ingestionUrl.endsWith("/")) { + return ingestionUrl; + } + return ingestionUrl + "/"; + }).map(ingestionUrl -> ingestionUrl + "v2.1/track").toList(); + } }