-
Notifications
You must be signed in to change notification settings - Fork 851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add the actual XRay remote sampler which polls rules and orders / app… #3343
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,16 @@ | ||
Comparing source compatibility of against | ||
No changes. | ||
+++ NEW CLASS: PUBLIC(+) FINAL(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSampler (not serializable) | ||
+++ CLASS FILE FORMAT VERSION: 52.0 <- n.a. | ||
+++ NEW SUPERCLASS: java.lang.Object | ||
+++ NEW METHOD: PUBLIC(+) void close() | ||
+++ NEW METHOD: PUBLIC(+) java.lang.String getDescription() | ||
+++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSamplerBuilder newBuilder(io.opentelemetry.sdk.resources.Resource) | ||
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.trace.samplers.SamplingResult shouldSample(io.opentelemetry.context.Context, java.lang.String, java.lang.String, io.opentelemetry.api.trace.SpanKind, io.opentelemetry.api.common.Attributes, java.util.List) | ||
+++ NEW CLASS: PUBLIC(+) FINAL(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSamplerBuilder (not serializable) | ||
+++ CLASS FILE FORMAT VERSION: 52.0 <- n.a. | ||
+++ NEW SUPERCLASS: java.lang.Object | ||
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSampler build() | ||
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSamplerBuilder setEndpoint(java.lang.String) | ||
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSamplerBuilder setInitialSampler(io.opentelemetry.sdk.trace.samplers.Sampler) | ||
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSamplerBuilder setPollingInterval(java.time.Duration) | ||
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSamplerBuilder setPollingInterval(long, java.util.concurrent.TimeUnit) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.sdk.extension.aws.trace; | ||
|
||
import io.opentelemetry.api.common.Attributes; | ||
import io.opentelemetry.api.trace.SpanKind; | ||
import io.opentelemetry.context.Context; | ||
import io.opentelemetry.sdk.internal.DaemonThreadFactory; | ||
import io.opentelemetry.sdk.resources.Resource; | ||
import io.opentelemetry.sdk.trace.data.LinkData; | ||
import io.opentelemetry.sdk.trace.samplers.Sampler; | ||
import io.opentelemetry.sdk.trace.samplers.SamplingResult; | ||
import java.io.Closeable; | ||
import java.util.List; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.ScheduledFuture; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.logging.Level; | ||
import java.util.logging.Logger; | ||
import javax.annotation.Nullable; | ||
|
||
/** Remote sampler that gets sampling configuration from AWS X-Ray. */ | ||
public final class AwsXrayRemoteSampler implements Sampler, Closeable { | ||
|
||
private static final Logger logger = Logger.getLogger(AwsXrayRemoteSampler.class.getName()); | ||
|
||
private static final String WORKER_THREAD_NAME = | ||
AwsXrayRemoteSampler.class.getSimpleName() + "_WorkerThread"; | ||
|
||
private final Resource resource; | ||
private final Sampler initialSampler; | ||
private final XraySamplerClient client; | ||
private final ScheduledExecutorService executor; | ||
private final ScheduledFuture<?> pollFuture; | ||
|
||
@Nullable private volatile GetSamplingRulesResponse previousRulesResponse; | ||
private volatile Sampler sampler; | ||
|
||
/** | ||
* Returns a {@link AwsXrayRemoteSamplerBuilder} with the given {@link Resource}. This {@link | ||
* Resource} should be the same as what the {@linkplain io.opentelemetry.sdk.OpenTelemetrySdk | ||
* OpenTelemetry SDK} is configured with. | ||
*/ | ||
// TODO(anuraaga): Deprecate after | ||
// https://github.com/open-telemetry/opentelemetry-specification/issues/1588 | ||
public static AwsXrayRemoteSamplerBuilder newBuilder(Resource resource) { | ||
return new AwsXrayRemoteSamplerBuilder(resource); | ||
} | ||
|
||
AwsXrayRemoteSampler( | ||
Resource resource, String endpoint, Sampler initialSampler, long pollingIntervalNanos) { | ||
this.resource = resource; | ||
this.initialSampler = initialSampler; | ||
client = new XraySamplerClient(endpoint); | ||
executor = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. personally, I'd just assign this in the declaration above, since it's final and doesn't depend on params. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tend to like either all assigned as fields or all in constructor, otherwise it seems like an ugly duck field |
||
Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory(WORKER_THREAD_NAME)); | ||
|
||
sampler = initialSampler; | ||
|
||
pollFuture = | ||
executor.scheduleAtFixedRate( | ||
this::getAndUpdateSampler, 0, pollingIntervalNanos, TimeUnit.NANOSECONDS); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe that pooling interval should have a sanity check - that is if the value is outside of typical, expected values, a warning should be logged. For me it's easy to imagine sb setting interval to 1 (assuming it's one second) and degrading the performance with request each nanosecond ;) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it looks like all the validation is down in the builder. |
||
} | ||
|
||
@Override | ||
public SamplingResult shouldSample( | ||
Context parentContext, | ||
String traceId, | ||
String name, | ||
SpanKind spanKind, | ||
Attributes attributes, | ||
List<LinkData> parentLinks) { | ||
return sampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks); | ||
} | ||
|
||
@Override | ||
public String getDescription() { | ||
return "AwsXrayRemoteSampler{" + sampler.getDescription() + "}"; | ||
} | ||
|
||
private void getAndUpdateSampler() { | ||
try { | ||
// No pagination support yet, or possibly ever. | ||
GetSamplingRulesResponse response = | ||
client.getSamplingRules(GetSamplingRulesRequest.create(null)); | ||
if (!response.equals(previousRulesResponse)) { | ||
sampler = new SamplingRulesSampler(resource, initialSampler, response.getSamplingRules()); | ||
previousRulesResponse = response; | ||
} | ||
} catch (Throwable t) { | ||
logger.log(Level.FINE, "Failed to update sampler", t); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
pollFuture.cancel(true); | ||
executor.shutdownNow(); | ||
// No flushing behavior so no need to wait for the shutdown. | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.sdk.extension.aws.trace; | ||
|
||
import static io.opentelemetry.api.internal.Utils.checkArgument; | ||
import static java.util.Objects.requireNonNull; | ||
|
||
import io.opentelemetry.sdk.resources.Resource; | ||
import io.opentelemetry.sdk.trace.samplers.Sampler; | ||
import java.time.Duration; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
/** A builder for {@link AwsXrayRemoteSampler}. */ | ||
public final class AwsXrayRemoteSamplerBuilder { | ||
|
||
private static final String DEFAULT_ENDPOINT = "http://localhost:2000"; | ||
private static final long DEFAULT_POLLING_INTERVAL_SECS = 300; | ||
|
||
private final Resource resource; | ||
|
||
private String endpoint = DEFAULT_ENDPOINT; | ||
private Sampler initialSampler = Sampler.parentBased(Sampler.alwaysOn()); | ||
private long pollingIntervalNanos = TimeUnit.SECONDS.toNanos(DEFAULT_POLLING_INTERVAL_SECS); | ||
|
||
AwsXrayRemoteSamplerBuilder(Resource resource) { | ||
this.resource = resource; | ||
} | ||
|
||
/** | ||
* Sets the endpoint for the TCP proxy to connect to. This is the address to the port on the | ||
* OpenTelemetry Collector configured for proxying X-Ray sampling requests. If unset, defaults to | ||
* {@value DEFAULT_ENDPOINT}. | ||
*/ | ||
public AwsXrayRemoteSamplerBuilder setEndpoint(String endpoint) { | ||
requireNonNull(endpoint, "endpoint"); | ||
this.endpoint = endpoint; | ||
return this; | ||
} | ||
|
||
/** | ||
* Sets the polling interval for configuration updates. If unset, defaults to {@value | ||
* DEFAULT_POLLING_INTERVAL_SECS}s. Must be positive. | ||
*/ | ||
public AwsXrayRemoteSamplerBuilder setPollingInterval(Duration delay) { | ||
requireNonNull(delay, "delay"); | ||
return setPollingInterval(delay.toNanos(), TimeUnit.NANOSECONDS); | ||
} | ||
|
||
/** | ||
* Sets the polling interval for configuration updates. If unset, defaults to {@value | ||
* DEFAULT_POLLING_INTERVAL_SECS}s. Must be positive. | ||
*/ | ||
public AwsXrayRemoteSamplerBuilder setPollingInterval(long delay, TimeUnit unit) { | ||
requireNonNull(unit, "unit"); | ||
checkArgument(delay >= 0, "delay must be non-negative"); | ||
pollingIntervalNanos = unit.toNanos(delay); | ||
return this; | ||
} | ||
|
||
/** | ||
* Sets the initial sampler that is used before sampling configuration is obtained. If unset, | ||
* defaults to a parent-based always-on sampler. | ||
*/ | ||
public AwsXrayRemoteSamplerBuilder setInitialSampler(Sampler initialSampler) { | ||
requireNonNull(initialSampler, "initialSampler"); | ||
this.initialSampler = initialSampler; | ||
return this; | ||
} | ||
|
||
/** Returns a {@link AwsXrayRemoteSampler} with the configuration of this builder. */ | ||
public AwsXrayRemoteSampler build() { | ||
return new AwsXrayRemoteSampler(resource, endpoint, initialSampler, pollingIntervalNanos); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.sdk.extension.aws.trace; | ||
|
||
import io.opentelemetry.api.common.Attributes; | ||
import io.opentelemetry.api.trace.SpanKind; | ||
import io.opentelemetry.context.Context; | ||
import io.opentelemetry.sdk.resources.Resource; | ||
import io.opentelemetry.sdk.trace.data.LinkData; | ||
import io.opentelemetry.sdk.trace.samplers.Sampler; | ||
import io.opentelemetry.sdk.trace.samplers.SamplingResult; | ||
import java.util.Arrays; | ||
import java.util.Comparator; | ||
import java.util.List; | ||
import java.util.logging.Level; | ||
import java.util.logging.Logger; | ||
|
||
final class SamplingRulesSampler implements Sampler { | ||
|
||
private static final Logger logger = Logger.getLogger(SamplingRulesSampler.class.getName()); | ||
|
||
private final Resource resource; | ||
private final Sampler fallbackSampler; | ||
private final SamplingRuleApplier[] ruleAppliers; | ||
|
||
SamplingRulesSampler( | ||
Resource resource, | ||
Sampler fallbackSampler, | ||
List<GetSamplingRulesResponse.SamplingRuleRecord> rules) { | ||
this.resource = resource; | ||
this.fallbackSampler = fallbackSampler; | ||
ruleAppliers = | ||
rules.stream() | ||
.map(GetSamplingRulesResponse.SamplingRuleRecord::getRule) | ||
// Lower priority value takes precedence so normal ascending sort. | ||
.sorted(Comparator.comparingInt(GetSamplingRulesResponse.SamplingRule::getPriority)) | ||
.map(SamplingRuleApplier::new) | ||
.toArray(SamplingRuleApplier[]::new); | ||
} | ||
|
||
@Override | ||
public SamplingResult shouldSample( | ||
Context parentContext, | ||
String traceId, | ||
String name, | ||
SpanKind spanKind, | ||
Attributes attributes, | ||
List<LinkData> parentLinks) { | ||
for (SamplingRuleApplier applier : ruleAppliers) { | ||
if (applier.matches(name, attributes, resource)) { | ||
return applier.shouldSample( | ||
parentContext, traceId, name, spanKind, attributes, parentLinks); | ||
} | ||
} | ||
|
||
// In practice, X-Ray always returns a Default rule that matches all requests so it is a bug in | ||
// our code or X-Ray to reach here, fallback just in case. | ||
logger.log( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👌 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm. In the case this happens, it will totally spam the logs at WARNING for every span. I know it shouldn't ever happen, but should we maybe not be so aggressive with the logging here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True that the fallback loses its meaning if it adds destructive spam on every single span, turned it to FINE (thought about throttling logger but will lose access to it in contrib repo 😿 ) |
||
Level.FINE, | ||
"No sampling rule matched the request. " | ||
+ "This is a bug in either the OpenTelemetry SDK or X-Ray."); | ||
return fallbackSampler.shouldSample( | ||
parentContext, traceId, name, spanKind, attributes, parentLinks); | ||
} | ||
|
||
@Override | ||
public String getDescription() { | ||
return "XrayRulesSampler{" + Arrays.toString(ruleAppliers) + "}"; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: this is my defensive thinking, but as the constructor is package-protected and thus accessible, I'd validate constructor parameters here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think they're all validated in the builder