diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-aws.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-aws.txt index df26146497b..fee644f2fb2 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-aws.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-aws.txt @@ -1,2 +1,16 @@ Comparing source compatibility of against -No changes. \ No newline at end of file ++++ NEW CLASS: PUBLIC(+) FINAL(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSampler (not serializable) + +++ CLASS FILE FORMAT VERSION: 52.0 <- n.a. + +++ NEW SUPERCLASS: java.lang.Object + +++ NEW METHOD: PUBLIC(+) void close() + +++ NEW METHOD: PUBLIC(+) java.lang.String getDescription() + +++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSamplerBuilder newBuilder(io.opentelemetry.sdk.resources.Resource) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.trace.samplers.SamplingResult shouldSample(io.opentelemetry.context.Context, java.lang.String, java.lang.String, io.opentelemetry.api.trace.SpanKind, io.opentelemetry.api.common.Attributes, java.util.List) ++++ NEW CLASS: PUBLIC(+) FINAL(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSamplerBuilder (not serializable) + +++ CLASS FILE FORMAT VERSION: 52.0 <- n.a. + +++ NEW SUPERCLASS: java.lang.Object + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSampler build() + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSamplerBuilder setEndpoint(java.lang.String) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSamplerBuilder setInitialSampler(io.opentelemetry.sdk.trace.samplers.Sampler) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSamplerBuilder setPollingInterval(java.time.Duration) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSamplerBuilder setPollingInterval(long, java.util.concurrent.TimeUnit) diff --git a/sdk-extensions/aws/src/main/java/io/opentelemetry/sdk/extension/aws/trace/AwsXrayRemoteSampler.java b/sdk-extensions/aws/src/main/java/io/opentelemetry/sdk/extension/aws/trace/AwsXrayRemoteSampler.java new file mode 100644 index 00000000000..0a9f8fed4f0 --- /dev/null +++ b/sdk-extensions/aws/src/main/java/io/opentelemetry/sdk/extension/aws/trace/AwsXrayRemoteSampler.java @@ -0,0 +1,105 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.extension.aws.trace; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.internal.DaemonThreadFactory; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.sdk.trace.samplers.SamplingResult; +import java.io.Closeable; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +/** Remote sampler that gets sampling configuration from AWS X-Ray. */ +public final class AwsXrayRemoteSampler implements Sampler, Closeable { + + private static final Logger logger = Logger.getLogger(AwsXrayRemoteSampler.class.getName()); + + private static final String WORKER_THREAD_NAME = + AwsXrayRemoteSampler.class.getSimpleName() + "_WorkerThread"; + + private final Resource resource; + private final Sampler initialSampler; + private final XraySamplerClient client; + private final ScheduledExecutorService executor; + private final ScheduledFuture pollFuture; + + @Nullable private volatile GetSamplingRulesResponse previousRulesResponse; + private volatile Sampler sampler; + + /** + * Returns a {@link AwsXrayRemoteSamplerBuilder} with the given {@link Resource}. This {@link + * Resource} should be the same as what the {@linkplain io.opentelemetry.sdk.OpenTelemetrySdk + * OpenTelemetry SDK} is configured with. + */ + // TODO(anuraaga): Deprecate after + // https://github.com/open-telemetry/opentelemetry-specification/issues/1588 + public static AwsXrayRemoteSamplerBuilder newBuilder(Resource resource) { + return new AwsXrayRemoteSamplerBuilder(resource); + } + + AwsXrayRemoteSampler( + Resource resource, String endpoint, Sampler initialSampler, long pollingIntervalNanos) { + this.resource = resource; + this.initialSampler = initialSampler; + client = new XraySamplerClient(endpoint); + executor = + Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory(WORKER_THREAD_NAME)); + + sampler = initialSampler; + + pollFuture = + executor.scheduleAtFixedRate( + this::getAndUpdateSampler, 0, pollingIntervalNanos, TimeUnit.NANOSECONDS); + } + + @Override + public SamplingResult shouldSample( + Context parentContext, + String traceId, + String name, + SpanKind spanKind, + Attributes attributes, + List parentLinks) { + return sampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks); + } + + @Override + public String getDescription() { + return "AwsXrayRemoteSampler{" + sampler.getDescription() + "}"; + } + + private void getAndUpdateSampler() { + try { + // No pagination support yet, or possibly ever. + GetSamplingRulesResponse response = + client.getSamplingRules(GetSamplingRulesRequest.create(null)); + if (!response.equals(previousRulesResponse)) { + sampler = new SamplingRulesSampler(resource, initialSampler, response.getSamplingRules()); + previousRulesResponse = response; + } + } catch (Throwable t) { + logger.log(Level.FINE, "Failed to update sampler", t); + } + } + + @Override + public void close() { + pollFuture.cancel(true); + executor.shutdownNow(); + // No flushing behavior so no need to wait for the shutdown. + } +} diff --git a/sdk-extensions/aws/src/main/java/io/opentelemetry/sdk/extension/aws/trace/AwsXrayRemoteSamplerBuilder.java b/sdk-extensions/aws/src/main/java/io/opentelemetry/sdk/extension/aws/trace/AwsXrayRemoteSamplerBuilder.java new file mode 100644 index 00000000000..66b52401b0f --- /dev/null +++ b/sdk-extensions/aws/src/main/java/io/opentelemetry/sdk/extension/aws/trace/AwsXrayRemoteSamplerBuilder.java @@ -0,0 +1,77 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.extension.aws.trace; + +import static io.opentelemetry.api.internal.Utils.checkArgument; +import static java.util.Objects.requireNonNull; + +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +/** A builder for {@link AwsXrayRemoteSampler}. */ +public final class AwsXrayRemoteSamplerBuilder { + + private static final String DEFAULT_ENDPOINT = "http://localhost:2000"; + private static final long DEFAULT_POLLING_INTERVAL_SECS = 300; + + private final Resource resource; + + private String endpoint = DEFAULT_ENDPOINT; + private Sampler initialSampler = Sampler.parentBased(Sampler.alwaysOn()); + private long pollingIntervalNanos = TimeUnit.SECONDS.toNanos(DEFAULT_POLLING_INTERVAL_SECS); + + AwsXrayRemoteSamplerBuilder(Resource resource) { + this.resource = resource; + } + + /** + * Sets the endpoint for the TCP proxy to connect to. This is the address to the port on the + * OpenTelemetry Collector configured for proxying X-Ray sampling requests. If unset, defaults to + * {@value DEFAULT_ENDPOINT}. + */ + public AwsXrayRemoteSamplerBuilder setEndpoint(String endpoint) { + requireNonNull(endpoint, "endpoint"); + this.endpoint = endpoint; + return this; + } + + /** + * Sets the polling interval for configuration updates. If unset, defaults to {@value + * DEFAULT_POLLING_INTERVAL_SECS}s. Must be positive. + */ + public AwsXrayRemoteSamplerBuilder setPollingInterval(Duration delay) { + requireNonNull(delay, "delay"); + return setPollingInterval(delay.toNanos(), TimeUnit.NANOSECONDS); + } + + /** + * Sets the polling interval for configuration updates. If unset, defaults to {@value + * DEFAULT_POLLING_INTERVAL_SECS}s. Must be positive. + */ + public AwsXrayRemoteSamplerBuilder setPollingInterval(long delay, TimeUnit unit) { + requireNonNull(unit, "unit"); + checkArgument(delay >= 0, "delay must be non-negative"); + pollingIntervalNanos = unit.toNanos(delay); + return this; + } + + /** + * Sets the initial sampler that is used before sampling configuration is obtained. If unset, + * defaults to a parent-based always-on sampler. + */ + public AwsXrayRemoteSamplerBuilder setInitialSampler(Sampler initialSampler) { + requireNonNull(initialSampler, "initialSampler"); + this.initialSampler = initialSampler; + return this; + } + + /** Returns a {@link AwsXrayRemoteSampler} with the configuration of this builder. */ + public AwsXrayRemoteSampler build() { + return new AwsXrayRemoteSampler(resource, endpoint, initialSampler, pollingIntervalNanos); + } +} diff --git a/sdk-extensions/aws/src/main/java/io/opentelemetry/sdk/extension/aws/trace/SamplingRulesSampler.java b/sdk-extensions/aws/src/main/java/io/opentelemetry/sdk/extension/aws/trace/SamplingRulesSampler.java new file mode 100644 index 00000000000..ef19fee40f2 --- /dev/null +++ b/sdk-extensions/aws/src/main/java/io/opentelemetry/sdk/extension/aws/trace/SamplingRulesSampler.java @@ -0,0 +1,73 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.extension.aws.trace; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.sdk.trace.samplers.SamplingResult; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +final class SamplingRulesSampler implements Sampler { + + private static final Logger logger = Logger.getLogger(SamplingRulesSampler.class.getName()); + + private final Resource resource; + private final Sampler fallbackSampler; + private final SamplingRuleApplier[] ruleAppliers; + + SamplingRulesSampler( + Resource resource, + Sampler fallbackSampler, + List rules) { + this.resource = resource; + this.fallbackSampler = fallbackSampler; + ruleAppliers = + rules.stream() + .map(GetSamplingRulesResponse.SamplingRuleRecord::getRule) + // Lower priority value takes precedence so normal ascending sort. + .sorted(Comparator.comparingInt(GetSamplingRulesResponse.SamplingRule::getPriority)) + .map(SamplingRuleApplier::new) + .toArray(SamplingRuleApplier[]::new); + } + + @Override + public SamplingResult shouldSample( + Context parentContext, + String traceId, + String name, + SpanKind spanKind, + Attributes attributes, + List parentLinks) { + for (SamplingRuleApplier applier : ruleAppliers) { + if (applier.matches(name, attributes, resource)) { + return applier.shouldSample( + parentContext, traceId, name, spanKind, attributes, parentLinks); + } + } + + // In practice, X-Ray always returns a Default rule that matches all requests so it is a bug in + // our code or X-Ray to reach here, fallback just in case. + logger.log( + Level.FINE, + "No sampling rule matched the request. " + + "This is a bug in either the OpenTelemetry SDK or X-Ray."); + return fallbackSampler.shouldSample( + parentContext, traceId, name, spanKind, attributes, parentLinks); + } + + @Override + public String getDescription() { + return "XrayRulesSampler{" + Arrays.toString(ruleAppliers) + "}"; + } +} diff --git a/sdk-extensions/aws/src/test/java/io/opentelemetry/sdk/extension/aws/trace/AwsXrayRemoteSamplerTest.java b/sdk-extensions/aws/src/test/java/io/opentelemetry/sdk/extension/aws/trace/AwsXrayRemoteSamplerTest.java new file mode 100644 index 00000000000..a697b8c7caf --- /dev/null +++ b/sdk-extensions/aws/src/test/java/io/opentelemetry/sdk/extension/aws/trace/AwsXrayRemoteSamplerTest.java @@ -0,0 +1,190 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.extension.aws.trace; + +import static java.util.Objects.requireNonNull; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import com.google.common.io.ByteStreams; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.TraceId; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.sdk.trace.samplers.SamplingDecision; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class AwsXrayRemoteSamplerTest { + + private static final byte[] RESPONSE_1; + private static final byte[] RESPONSE_2; + + static { + try { + RESPONSE_1 = + ByteStreams.toByteArray( + requireNonNull( + AwsXrayRemoteSamplerTest.class.getResourceAsStream( + "/test-sampling-rules-response-1.json"))); + RESPONSE_2 = + ByteStreams.toByteArray( + requireNonNull( + AwsXrayRemoteSamplerTest.class.getResourceAsStream( + "/test-sampling-rules-response-2.json"))); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static final AtomicReference response = new AtomicReference<>(); + + private static final String TRACE_ID = TraceId.fromLongs(1, 2); + + @RegisterExtension + public static final ServerExtension server = + new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + sb.service( + "/GetSamplingRules", + (ctx, req) -> { + byte[] response = AwsXrayRemoteSamplerTest.response.get(); + if (response == null) { + // Error out until the test configures a response, the sampler will use the + // initial + // sampler in the meantime. + return HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR); + } + return HttpResponse.of(HttpStatus.OK, MediaType.JSON_UTF_8, response); + }); + } + }; + + private AwsXrayRemoteSampler sampler; + + @BeforeEach + void setUp() { + sampler = + AwsXrayRemoteSampler.newBuilder(Resource.empty()) + .setInitialSampler(Sampler.alwaysOn()) + .setEndpoint(server.httpUri().toString()) + .setPollingInterval(Duration.ofMillis(10)) + .build(); + } + + @AfterEach + void tearDown() { + sampler.close(); + response.set(null); + } + + @Test + void getAndUpdate() { + // Initial Sampler allows all. + assertThat( + sampler + .shouldSample( + Context.root(), + TRACE_ID, + "cat-service", + SpanKind.SERVER, + Attributes.empty(), + Collections.emptyList()) + .getDecision()) + .isEqualTo(SamplingDecision.RECORD_AND_SAMPLE); + assertThat( + sampler + .shouldSample( + Context.root(), + TRACE_ID, + "dog-service", + SpanKind.SERVER, + Attributes.empty(), + Collections.emptyList()) + .getDecision()) + .isEqualTo(SamplingDecision.RECORD_AND_SAMPLE); + + response.set(RESPONSE_1); + + // cat-service allowed, others dropped + await() + .untilAsserted( + () -> { + assertThat( + sampler + .shouldSample( + Context.root(), + TRACE_ID, + "cat-service", + SpanKind.SERVER, + Attributes.empty(), + Collections.emptyList()) + .getDecision()) + .isEqualTo(SamplingDecision.RECORD_AND_SAMPLE); + assertThat( + sampler + .shouldSample( + Context.root(), + TRACE_ID, + "dog-service", + SpanKind.SERVER, + Attributes.empty(), + Collections.emptyList()) + .getDecision()) + .isEqualTo(SamplingDecision.DROP); + }); + + response.set(RESPONSE_2); + + // cat-service dropped, others allowed + await() + .untilAsserted( + () -> { + assertThat( + sampler + .shouldSample( + Context.root(), + TRACE_ID, + "cat-service", + SpanKind.SERVER, + Attributes.empty(), + Collections.emptyList()) + .getDecision()) + .isEqualTo(SamplingDecision.DROP); + assertThat( + sampler + .shouldSample( + Context.root(), + TRACE_ID, + "dog-service", + SpanKind.SERVER, + Attributes.empty(), + Collections.emptyList()) + .getDecision()) + .isEqualTo(SamplingDecision.RECORD_AND_SAMPLE); + }); + } + + @Test + void initialSampler() { + assertThat(sampler.getDescription()).isEqualTo("AwsXrayRemoteSampler{AlwaysOnSampler}"); + } +} diff --git a/sdk-extensions/aws/src/test/resources/test-sampling-rules-response-1.json b/sdk-extensions/aws/src/test/resources/test-sampling-rules-response-1.json new file mode 100644 index 00000000000..2b121c7a692 --- /dev/null +++ b/sdk-extensions/aws/src/test/resources/test-sampling-rules-response-1.json @@ -0,0 +1,42 @@ +{ + "SamplingRuleRecords": [ + { + "SamplingRule": { + "RuleName": "Test", + "RuleARN": "arn:aws:xray:us-east-1:595986152929:sampling-rule/Test", + "ResourceARN": "*", + "Priority": 1, + "FixedRate": 1.0, + "ReservoirSize": 1, + "ServiceName": "cat-service", + "ServiceType": "*", + "Host": "*", + "HTTPMethod": "*", + "URLPath": "*", + "Version": 1, + "Attributes": {} + }, + "CreatedAt": "2021-06-18T17:28:15+09:00", + "ModifiedAt": "2021-06-18T17:28:15+09:00" + }, + { + "SamplingRule": { + "RuleName": "Default", + "RuleARN": "arn:aws:xray:us-east-1:595986152929:sampling-rule/Default", + "ResourceARN": "*", + "Priority": 10000, + "FixedRate": 0.0, + "ReservoirSize": 1, + "ServiceName": "*", + "ServiceType": "*", + "Host": "*", + "HTTPMethod": "*", + "URLPath": "*", + "Version": 1, + "Attributes": {} + }, + "CreatedAt": "1970-01-01T09:00:00+09:00", + "ModifiedAt": "1970-01-01T09:00:00+09:00" + } + ] +} \ No newline at end of file diff --git a/sdk-extensions/aws/src/test/resources/test-sampling-rules-response-2.json b/sdk-extensions/aws/src/test/resources/test-sampling-rules-response-2.json new file mode 100644 index 00000000000..19929eb4d07 --- /dev/null +++ b/sdk-extensions/aws/src/test/resources/test-sampling-rules-response-2.json @@ -0,0 +1,42 @@ +{ + "SamplingRuleRecords": [ + { + "SamplingRule": { + "RuleName": "Test", + "RuleARN": "arn:aws:xray:us-east-1:595986152929:sampling-rule/Test", + "ResourceARN": "*", + "Priority": 1, + "FixedRate": 0.0, + "ReservoirSize": 1, + "ServiceName": "cat-service", + "ServiceType": "*", + "Host": "*", + "HTTPMethod": "*", + "URLPath": "*", + "Version": 1, + "Attributes": {} + }, + "CreatedAt": "2021-06-18T17:28:15+09:00", + "ModifiedAt": "2021-06-18T17:28:15+09:00" + }, + { + "SamplingRule": { + "RuleName": "Default", + "RuleARN": "arn:aws:xray:us-east-1:595986152929:sampling-rule/Default", + "ResourceARN": "*", + "Priority": 10000, + "FixedRate": 1.0, + "ReservoirSize": 1, + "ServiceName": "*", + "ServiceType": "*", + "Host": "*", + "HTTPMethod": "*", + "URLPath": "*", + "Version": 1, + "Attributes": {} + }, + "CreatedAt": "1970-01-01T09:00:00+09:00", + "ModifiedAt": "1970-01-01T09:00:00+09:00" + } + ] +} \ No newline at end of file