Skip to content

Commit

Permalink
[Tracing Framework] Adding support of Head based Sampler based on tra…
Browse files Browse the repository at this point in the history
…ce header & probability (opensearch-project#8918)

Signed-off-by: Dev Agarwal <[email protected]>
  • Loading branch information
devagarwal1803 committed Aug 25, 2023
1 parent e1c40b4 commit 7c505f2
Show file tree
Hide file tree
Showing 10 changed files with 382 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,23 @@ public List<Setting<?>> getSettings() {
OTelTelemetrySettings.TRACER_EXPORTER_BATCH_SIZE_SETTING,
OTelTelemetrySettings.TRACER_EXPORTER_DELAY_SETTING,
OTelTelemetrySettings.TRACER_EXPORTER_MAX_QUEUE_SIZE_SETTING,
OTelTelemetrySettings.OTEL_TRACER_SPAN_EXPORTER_CLASS_SETTING
OTelTelemetrySettings.OTEL_TRACER_SPAN_EXPORTER_CLASS_SETTING,
OTelTelemetrySettings.TRACER_HEAD_SAMPLER_SAMPLING_RATIO
);
}

@Override
public Optional<Telemetry> getTelemetry(TelemetrySettings settings) {
return Optional.of(telemetry());
return Optional.of(telemetry(settings));
}

@Override
public String getName() {
return OTEL_TRACER_NAME;
}

private Telemetry telemetry() {
return new OTelTelemetry(new OTelTracingTelemetry(OTelResourceProvider.get(settings)), new MetricsTelemetry() {
private Telemetry telemetry(TelemetrySettings telemetrySettings) {
return new OTelTelemetry(new OTelTracingTelemetry(OTelResourceProvider.get(this.settings, telemetrySettings.getClusterSettings())), new MetricsTelemetry() {
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
package org.opensearch.telemetry;

import org.opensearch.SpecialPermission;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.telemetry.tracing.exporter.OTelSpanExporterFactory;

Expand All @@ -29,7 +31,7 @@ public final class OTelTelemetrySettings {
* Base Constructor.
*/
private OTelTelemetrySettings() {}

private volatile double samplingProbability;
/**
* span exporter batch size
*/
Expand Down Expand Up @@ -60,6 +62,34 @@ private OTelTelemetrySettings() {}
Setting.Property.Final
);

/**
* Probability of sampler
*/
public static final Setting<Double> TRACER_HEAD_SAMPLER_SAMPLING_RATIO = Setting.doubleSetting(
"telemetry.otel.tracer.head.sample.sampling.ratio",
0.001d,
0.000d,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Constructor
* @param settings setting
* @param clusterSettings cluster settings
*/
public OTelTelemetrySettings(Settings settings, ClusterSettings clusterSettings) {
this.samplingProbability = TRACER_HEAD_SAMPLER_SAMPLING_RATIO.get(settings);
clusterSettings.addSettingsUpdateConsumer(TRACER_HEAD_SAMPLER_SAMPLING_RATIO, this::setSamplingProbability);
}

/**
* Get sampling ratio
*/
public double getTracerHeadSamplerSamplingRatio(){
return samplingProbability;
}

/**
* Span Exporter type setting.
*/
Expand All @@ -83,4 +113,12 @@ private OTelTelemetrySettings() {}
Setting.Property.NodeScope,
Setting.Property.Final
);

/**
* Set sampling ratio
* @param samplingProbability double
*/
public void setSamplingProbability(double samplingProbability) {
this.samplingProbability = samplingProbability;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.telemetry.tracing;

import org.opensearch.common.settings.Settings;
import org.opensearch.telemetry.OTelTelemetrySettings;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.telemetry.tracing.exporter.OTelSpanExporterFactory;

import java.util.concurrent.TimeUnit;
Expand All @@ -24,6 +26,8 @@
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
import org.opensearch.telemetry.tracing.sampler.HeadSampler;
import org.opensearch.telemetry.tracing.sampler.ProbabilisticSampler;

import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_BATCH_SIZE_SETTING;
import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_DELAY_SETTING;
Expand All @@ -38,14 +42,15 @@ private OTelResourceProvider() {}
/**
* Creates OpenTelemetry instance with default configuration
* @param settings cluster settings
* @param clusterSettings cluster settings
* @return OpenTelemetry instance
*/
public static OpenTelemetry get(Settings settings) {
public static OpenTelemetry get(Settings settings, ClusterSettings clusterSettings) {
return get(
settings,
OTelSpanExporterFactory.create(settings),
ContextPropagators.create(W3CTraceContextPropagator.getInstance()),
Sampler.alwaysOn()
Sampler.parentBased(new HeadSampler(new ProbabilisticSampler(new OTelTelemetrySettings(settings, clusterSettings))))
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.sampler;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;

import java.util.List;

/**
* HeadBased sampler
*/
public class HeadSampler implements Sampler {
private final Sampler defaultSampler;

/**
* Creates Head based sampler
* @param defaultSampler defaultSampler
*/
public HeadSampler(Sampler defaultSampler) {
this.defaultSampler = defaultSampler;
}

@Override
public SamplingResult shouldSample(
Context parentContext,
String traceId,
String name,
SpanKind spanKind,
Attributes attributes,
List<LinkData> parentLinks) {

if (attributes.get(AttributeKey.stringKey("trace")) != null && Boolean.parseBoolean(attributes.get(AttributeKey.stringKey("trace"))) ) {
return SamplingResult.recordAndSample();
} else {
return defaultSampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
}
}

@Override
public String getDescription() {
return "HeadSampler";
}

@Override
public String toString() {
return getDescription();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.opensearch.telemetry.tracing.sampler;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;
import org.opensearch.telemetry.OTelTelemetrySettings;

import java.util.List;

/**
* ProbabilisticSampler implements a head-based sampling strategy based on provided settings.
*/
public class ProbabilisticSampler implements Sampler {
private Sampler defaultSampler;
private final OTelTelemetrySettings oTelTelemetrySettings;

private double tracerHeadSamplerSamplingRatio;

/**
* Constructor
*
* @param oTelTelemetrySettings Telemetry settings.
*/
public ProbabilisticSampler(OTelTelemetrySettings oTelTelemetrySettings) {
if (oTelTelemetrySettings == null) {
throw new IllegalArgumentException("oTelTelemetrySettings cannot be null");
}
this.oTelTelemetrySettings = oTelTelemetrySettings;
this.tracerHeadSamplerSamplingRatio = oTelTelemetrySettings.getTracerHeadSamplerSamplingRatio();
this.defaultSampler = createProbabilitySampler(tracerHeadSamplerSamplingRatio);
}

private Sampler createProbabilitySampler(double samplingRatio) {
return Sampler.traceIdRatioBased(samplingRatio);
}

Sampler getSampler() {
double newSamplingRatio = oTelTelemetrySettings.getTracerHeadSamplerSamplingRatio();
if(isSamplingRatioChanged(newSamplingRatio)){
this.tracerHeadSamplerSamplingRatio = newSamplingRatio;
defaultSampler = createProbabilitySampler(tracerHeadSamplerSamplingRatio);
}
return defaultSampler;
}

private boolean isSamplingRatioChanged(double newSamplingRatio) {
return Double.compare(this.tracerHeadSamplerSamplingRatio, newSamplingRatio)!=0;
}
double getTracerHeadSamplerSamplingRatio() {
return tracerHeadSamplerSamplingRatio;
}
@Override
public SamplingResult shouldSample(
Context parentContext,
String traceId,
String name,
SpanKind spanKind,
Attributes attributes,
List<LinkData> parentLinks) {
return getSampler().shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
}

@Override
public String getDescription() {
return "Probabilistic Sampler";
}

@Override
public String toString() {
return getDescription();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* This package contains classes needed for HeadBased Sampler.
*/
package org.opensearch.telemetry.tracing.sampler;
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_BATCH_SIZE_SETTING;
import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_DELAY_SETTING;
import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_MAX_QUEUE_SIZE_SETTING;
import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_HEAD_SAMPLER_SAMPLING_RATIO;

public class OTelTelemetryPluginTests extends OpenSearchTestCase {

Expand All @@ -42,10 +43,15 @@ public void setup() {
// io.opentelemetry.sdk.OpenTelemetrySdk.close waits only for 10 seconds for shutdown to complete.
Settings settings = Settings.builder().put(TRACER_EXPORTER_DELAY_SETTING.getKey(), "1s").build();
oTelTracerModulePlugin = new OTelTelemetryPlugin(settings);
telemetry = oTelTracerModulePlugin.getTelemetry(null);
telemetry = oTelTracerModulePlugin.getTelemetry(new TelemetrySettings(settings, new ClusterSettings(settings, getClusterSettings())));
tracingTelemetry = telemetry.get().getTracingTelemetry();
}

private Set<Setting<?>> getClusterSettings() {
Set<Setting<?>> allTracerSettings = new HashSet<>();
allTracerSettings.add(TRACER_HEAD_SAMPLER_SAMPLING_RATIO);
ClusterSettings.FEATURE_FLAGGED_CLUSTER_SETTINGS.get(List.of(FeatureFlags.TELEMETRY)).stream().forEach((allTracerSettings::add));
return allTracerSettings;
}
public void testGetTelemetry() {
Set<Setting<?>> allTracerSettings = new HashSet<>();
ClusterSettings.FEATURE_FLAGGED_CLUSTER_SETTINGS.get(List.of(FeatureFlags.TELEMETRY)).stream().forEach((allTracerSettings::add));
Expand All @@ -56,7 +62,8 @@ public void testGetTelemetry() {
TRACER_EXPORTER_BATCH_SIZE_SETTING,
TRACER_EXPORTER_DELAY_SETTING,
TRACER_EXPORTER_MAX_QUEUE_SIZE_SETTING,
OTEL_TRACER_SPAN_EXPORTER_CLASS_SETTING
OTEL_TRACER_SPAN_EXPORTER_CLASS_SETTING,
TRACER_HEAD_SAMPLER_SAMPLING_RATIO
),
oTelTracerModulePlugin.getSettings()
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.sampler;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import org.opensearch.test.OpenSearchTestCase;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.never;
import io.opentelemetry.api.common.Attributes;

import java.util.Collections;

public class HeadSamplerTests extends OpenSearchTestCase {

public void testShouldSampleWithTraceAttributeAsTrue() {

// Create a mock default sampler
Sampler defaultSampler = mock(Sampler.class);
when(defaultSampler.shouldSample(any(), anyString(), anyString(), any(), any(), any()))
.thenReturn(SamplingResult.drop());

// Create an instance of HeadSampler with the mock default sampler
HeadSampler headSampler = new HeadSampler(defaultSampler);

// Create a mock Context and Attributes
Context parentContext = mock(Context.class);
Attributes attributes = Attributes.of(AttributeKey.stringKey("trace"), "true");

// Call shouldSample on HeadSampler
SamplingResult result = headSampler.shouldSample(parentContext, "traceId", "spanName",
SpanKind.INTERNAL, attributes, Collections.emptyList());

assertEquals(SamplingResult.recordAndSample(), result);

// Verify that the default sampler's shouldSample method was not called
verify(defaultSampler, never()).shouldSample(any(), anyString(), anyString(), any(), any(), any());
}


public void testShouldSampleWithoutTraceAttribute() {

// Create a mock default sampler
Sampler defaultSampler = mock(Sampler.class);
when(defaultSampler.shouldSample(any(), anyString(), anyString(), any(), any(), any()))
.thenReturn(SamplingResult.recordAndSample());

// Create an instance of HeadSampler with the mock default sampler
HeadSampler headSampler = new HeadSampler(defaultSampler);

// Create a mock Context and Attributes
Context parentContext = mock(Context.class);
Attributes attributes = Attributes.empty();

// Call shouldSample on HeadSampler
SamplingResult result = headSampler.shouldSample(parentContext, "traceId", "spanName",
SpanKind.INTERNAL, attributes, Collections.emptyList());

// Verify that HeadSampler returned SamplingResult.recordAndSample()
assertEquals(SamplingResult.recordAndSample(), result);

// Verify that the default sampler's shouldSample method was called
verify(defaultSampler).shouldSample(any(), anyString(), anyString(), any(), any(), any());
}

}
Loading

0 comments on commit 7c505f2

Please sign in to comment.