Skip to content

Commit

Permalink
Sampler to be used by the Azure endpoints (#195)
Browse files Browse the repository at this point in the history
* Sampler to be used by the Azure endpoints

* Fix local variable

* Sampler to be used by the Azure endpoints

* Filter Azure URLs

* Bump azure-monitor-opentelemetry-exporter

* Rollback Azure exporter bump

* Add test about the issue and fix the implemenation

* Don't instrument stats beat endpoints

---------

Co-authored-by: Jean Bisutti <[email protected]>
  • Loading branch information
brunobat and jeanbisutti authored Jul 16, 2024
1 parent 8dd6a78 commit 8da3553
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.netty.handler.ssl.OpenSsl;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.quarkiverse.opentelemetry.exporter.azure.runtime.AzureEndpointSampler;
import io.quarkiverse.opentelemetry.exporter.azure.runtime.AzureExporterBuildConfig;
import io.quarkiverse.opentelemetry.exporter.azure.runtime.AzureExporterQuarkusRuntimeConfig;
import io.quarkiverse.opentelemetry.exporter.azure.runtime.AzureExporterRuntimeConfig;
Expand Down Expand Up @@ -148,4 +150,20 @@ SyntheticBeanBuildItem installBatchSpanProcessorForAzure(AzureRecorder recorder,
.createWith(recorder.createSpanProcessorForAzure(runtimeConfig, quarkusRuntimeConfig))
.done();
}

@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
SyntheticBeanBuildItem installAzureEndpointSampler(AzureRecorder recorder,
AzureExporterRuntimeConfig runtimeConfig,
AzureExporterQuarkusRuntimeConfig quarkusRuntimeConfig) {
return SyntheticBeanBuildItem.configure(AzureEndpointSampler.class)
.types(Sampler.class)
.setRuntimeInit()
.scope(Singleton.class)
.unremovable()
.addInjectionPoint(ParameterizedType.create(DotName.createSimple(Instance.class),
new Type[] { ClassType.create(DotName.createSimple(Sampler.class.getName())) }, null))
.createWith(recorder.createSampler(runtimeConfig, quarkusRuntimeConfig))
.done();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
<version>${quarkus-wiremock-test.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj-core.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@

import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static io.restassured.RestAssured.given;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.awaitility.Awaitility;

import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.verification.LoggedRequest;

import io.quarkus.test.junit.QuarkusTest;

Expand All @@ -36,7 +40,7 @@ public void stopWireMock() {
}

@Test
void connectionTest() {
void connectionTest() throws InterruptedException {

wireMockServer.stubFor(
any(urlMatching(".*"))
Expand All @@ -50,19 +54,27 @@ void connectionTest() {
.statusCode(200)
.body("message", equalTo("Direct trace"));

Awaitility.await()
await()
.atMost(Duration.ofSeconds(10))
.until(azureExportIsDone(wireMockServer));
.until(telemetryDataContainTheHttpCall(wireMockServer));

// Non regression test for https://github.com/Azure/azure-sdk-for-java/issues/41040
Thread.sleep(10_000);
List<LoggedRequest> telemetryExport = wireMockServer.findAll(postRequestedFor(urlEqualTo("/export/v2.1/track")));
List<String> requestBodies = telemetryExport
.stream()
.map(request -> new String(request.getBody())).toList();
requestBodies.stream().forEach(System.out::println); // It's convenient to print the telemetry data on the console to spot potential issues
Optional<String> telemetryDataExport = requestBodies.stream()
.filter(body -> body.contains("RemoteDependency") && body.contains("POST /export/v2.1/track"))
.findAny();
assertThat(telemetryDataExport).as("Telemetry export request should not appear as a dependency.").isEmpty();
}

private static Callable<Boolean> azureExportIsDone(WireMockServer wireMockServer) {
return () -> {
try {
wireMockServer.verify(1, postRequestedFor(urlEqualTo("/export/v2.1/track")));
return Boolean.TRUE;
} catch (AssertionError e) {
return Boolean.FALSE;
}
};
private static Callable<Boolean> telemetryDataContainTheHttpCall(WireMockServer wireMockServer) {
return () -> wireMockServer.findAll(postRequestedFor(urlEqualTo("/export/v2.1/track")))
.stream()
.map(request -> new String(request.getBody()))
.anyMatch(body -> body.contains("Request") && body.contains("GET /direct"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.quarkiverse.opentelemetry.exporter.azure.runtime;

import java.util.List;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;
import io.opentelemetry.semconv.SemanticAttributes;

/**
* Sampler that drops spans based on the target of the request.
* Inspired by {@link io.quarkus.opentelemetry.runtime.tracing.DropTargetsSampler}
*/
public class AzureEndpointSampler implements Sampler {

private final List<String> dropTargets;

public AzureEndpointSampler(final List<String> dropTargets) {
this.dropTargets = dropTargets;
}

@Override
public SamplingResult shouldSample(Context context,
String s,
String s1,
SpanKind spanKind,
Attributes attributes,
List<LinkData> list) {
if (spanKind.equals(SpanKind.CLIENT)) {
String target = attributes.get(SemanticAttributes.HTTP_URL);
if (target == null) {
target = attributes.get(SemanticAttributes.URL_FULL);
}
if (shouldDrop(target)) {
return SamplingResult.drop();
}
}
return Sampler.alwaysOn().shouldSample(context, s, s1, spanKind, attributes, list);
}

@Override
public String getDescription() {
return "azure-endpoint-sampler";
}

private boolean shouldDrop(String target) {
if ((target == null) || target.isEmpty()) {
return false;
}
if (safeContains(target)) { // check exact match
return true;
}
if (target.charAt(target.length() - 1) == '/') { // check if the path without the ending slash matched
if (safeContains(target.substring(0, target.length() - 1))) {
return true;
}
}
return false;
}

private boolean safeContains(String target) {
return dropTargets.contains(target);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package io.quarkiverse.opentelemetry.exporter.azure.runtime;

import java.util.Optional;
import java.util.*;
import java.util.function.Function;
import java.util.regex.Pattern;

import com.azure.monitor.opentelemetry.exporter.AzureMonitorExporterBuilder;

Expand All @@ -14,6 +15,8 @@
@Recorder
public class AzureRecorder {

public static final Pattern SEMI_COLON_PATTERN = Pattern.compile(";");

public Function<SyntheticCreationalContext<LateBoundSpanProcessor>, LateBoundSpanProcessor> createSpanProcessorForAzure(
AzureExporterRuntimeConfig runtimeConfig, AzureExporterQuarkusRuntimeConfig quarkusRuntimeConfig) {
return new Function<>() {
Expand All @@ -32,6 +35,43 @@ public LateBoundSpanProcessor apply(SyntheticCreationalContext<LateBoundSpanProc
};
}

public Function<SyntheticCreationalContext<AzureEndpointSampler>, AzureEndpointSampler> createSampler(
AzureExporterRuntimeConfig runtimeConfig, AzureExporterQuarkusRuntimeConfig quarkusRuntimeConfig) {
return new Function<>() {
@Override
public AzureEndpointSampler apply(SyntheticCreationalContext<AzureEndpointSampler> context) {
List<String> ingestionUrls = findIngestionUrls(runtimeConfig, quarkusRuntimeConfig);
List<String> statsBeatUrls = Arrays.asList("https://westeurope-5.in.applicationinsights.azure.com/",
"https://westus-0.in.applicationinsights.azure.com/");
List<String> dropTargets = new ArrayList<>();
dropTargets.addAll(addTrackPartInUrl(ingestionUrls));
dropTargets.addAll(addTrackPartInUrl(statsBeatUrls));
return new AzureEndpointSampler(dropTargets);
}
};
}

private static List<String> findIngestionUrls(AzureExporterRuntimeConfig runtimeConfig,
AzureExporterQuarkusRuntimeConfig quarkusRuntimeConfig) {
String connectionString = findConnectionString(runtimeConfig, quarkusRuntimeConfig);
Optional<String> ingestionEndpoint = extractIngestionEndpointFrom(connectionString);
if (ingestionEndpoint.isPresent()) {
return Collections.singletonList(ingestionEndpoint.get());
}
return Arrays.asList("https://dc.services.visualstudio.com/",
"https://rt.services.visualstudio.com/");
}

private static Optional<String> extractIngestionEndpointFrom(String connectionString) {
Optional<String> ingestionEndpointInConnectionString = SEMI_COLON_PATTERN.splitAsStream(connectionString)
.filter(element -> element.startsWith("IngestionEndpoint=")).findFirst();
if (ingestionEndpointInConnectionString.isEmpty()) {
return Optional.empty();
}
return ingestionEndpointInConnectionString
.map(ingestionPartOfConnectionString -> ingestionPartOfConnectionString.replaceAll("IngestionEndpoint=", ""));
}

private static String findConnectionString(AzureExporterRuntimeConfig runtimeConfig,
AzureExporterQuarkusRuntimeConfig quarkusRuntimeConfig) {
Optional<String> azureConnectionString = runtimeConfig.connectionString();
Expand All @@ -41,4 +81,14 @@ private static String findConnectionString(AzureExporterRuntimeConfig runtimeCon
return quarkusRuntimeConfig.connectionString()
.orElseThrow(() -> new IllegalStateException("Azure connection string is missing"));
}

private static List<String> addTrackPartInUrl(List<String> ingestionUrls) {
return ingestionUrls.stream()
.map(ingestionUrl -> {
if (ingestionUrl.endsWith("/")) {
return ingestionUrl;
}
return ingestionUrl + "/";
}).map(ingestionUrl -> ingestionUrl + "v2.1/track").toList();
}
}

0 comments on commit 8da3553

Please sign in to comment.