Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[router][common] Multiple fixes in Opentelemetry #1483

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public class VeniceMetricsConfig {
*/
public static final String OTEL_VENICE_METRICS_EXPORT_TO_ENDPOINT = "otel.venice.metrics.export.to.endpoint";

/**
* Export interval in seconds for OpenTelemetry metrics
*/
public static final String OTEL_VENICE_METRICS_EXPORT_INTERVAL_IN_SECONDS =
"otel.venice.metrics.export.interval.in.seconds";

/**
* Config Map to add custom dimensions to the metrics: Can be used for system dimensions
* amongst other custom dimensions <br>
Expand Down Expand Up @@ -130,6 +136,7 @@ public class VeniceMetricsConfig {
* 2. {@link VeniceOpenTelemetryMetricsRepository.LogBasedMetricExporter} for debug purposes
*/
private final boolean exportOtelMetricsToEndpoint;
private final int exportOtelMetricsIntervalInSeconds;
private final boolean exportOtelMetricsToLog;

/** Custom dimensions */
Expand Down Expand Up @@ -165,6 +172,7 @@ private VeniceMetricsConfig(Builder builder) {
this.metricEntities = builder.metricEntities;
this.emitOTelMetrics = builder.emitOtelMetrics;
this.exportOtelMetricsToEndpoint = builder.exportOtelMetricsToEndpoint;
this.exportOtelMetricsIntervalInSeconds = builder.exportOtelMetricsIntervalInSeconds;
this.otelCustomDimensionsMap = builder.otelCustomDimensionsMap;
this.otelExportProtocol = builder.otelExportProtocol;
this.otelEndpoint = builder.otelEndpoint;
Expand All @@ -184,6 +192,7 @@ public static class Builder {
private Collection<MetricEntity> metricEntities = new ArrayList<>();
private boolean emitOtelMetrics = false;
private boolean exportOtelMetricsToEndpoint = false;
private int exportOtelMetricsIntervalInSeconds = 60;
private Map<String, String> otelCustomDimensionsMap = new HashMap<>();
private String otelExportProtocol = OtlpConfigUtil.PROTOCOL_HTTP_PROTOBUF;
private String otelEndpoint = null;
Expand Down Expand Up @@ -222,6 +231,11 @@ public Builder setExportOtelMetricsToEndpoint(boolean exportOtelMetricsToEndpoin
return this;
}

public Builder setExportOtelMetricsIntervalInSeconds(int exportOtelMetricsIntervalInSeconds) {
this.exportOtelMetricsIntervalInSeconds = exportOtelMetricsIntervalInSeconds;
return this;
}

public Builder setOtelExportProtocol(String otelExportProtocol) {
this.otelExportProtocol = otelExportProtocol;
return this;
Expand Down Expand Up @@ -289,6 +303,10 @@ public Builder extractAndSetOtelConfigs(Map<String, String> configs) {
setExportOtelMetricsToEndpoint(Boolean.parseBoolean(configValue));
}

if ((configValue = configs.get(OTEL_VENICE_METRICS_EXPORT_INTERVAL_IN_SECONDS)) != null) {
setExportOtelMetricsIntervalInSeconds(Integer.parseInt(configValue));
}

/**
* custom dimensions are passed as key=value pairs separated by '=' <br>
* Multiple dimensions are separated by ','
Expand Down Expand Up @@ -426,6 +444,10 @@ public boolean exportOtelMetricsToEndpoint() {
return exportOtelMetricsToEndpoint;
}

public int getExportOtelMetricsIntervalInSeconds() {
return exportOtelMetricsIntervalInSeconds;
}

public Map<String, String> getOtelCustomDimensionsMap() {
return otelCustomDimensionsMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.InstrumentSelectorBuilder;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
Expand All @@ -34,6 +33,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -97,19 +97,20 @@ private void setExponentialHistogramAggregation(SdkMeterProviderBuilder builder,
}
}

// Build an InstrumentSelector with multiple setName calls for all Exponential Histogram metrics
InstrumentSelectorBuilder selectorBuilder = InstrumentSelector.builder().setType(InstrumentType.HISTOGRAM);
metricNames.forEach(selectorBuilder::setName);

// Register a single view with all metric names included in the InstrumentSelector
builder.registerView(
selectorBuilder.build(),
View.builder()
.setAggregation(
Aggregation.base2ExponentialBucketHistogram(
metricsConfig.getOtelExponentialHistogramMaxBuckets(),
metricsConfig.getOtelExponentialHistogramMaxScale()))
.build());
// Register views for all MetricType.HISTOGRAM metrics to be aggregated/exported as exponential histograms
for (String metricName: metricNames) {
InstrumentSelector selector =
InstrumentSelector.builder().setType(InstrumentType.HISTOGRAM).setName(metricName).build();

builder.registerView(
selector,
View.builder()
.setAggregation(
Aggregation.base2ExponentialBucketHistogram(
metricsConfig.getOtelExponentialHistogramMaxBuckets(),
metricsConfig.getOtelExponentialHistogramMaxScale()))
.build());
}
}

public VeniceOpenTelemetryMetricsRepository(VeniceMetricsConfig metricsConfig) {
Expand All @@ -129,11 +130,17 @@ public VeniceOpenTelemetryMetricsRepository(VeniceMetricsConfig metricsConfig) {
SdkMeterProviderBuilder builder = SdkMeterProvider.builder();
if (metricsConfig.exportOtelMetricsToEndpoint()) {
MetricExporter httpExporter = getOtlpHttpMetricExporter(metricsConfig);
builder.registerMetricReader(PeriodicMetricReader.builder(httpExporter).build());
builder.registerMetricReader(
PeriodicMetricReader.builder(httpExporter)
.setInterval(metricsConfig.getExportOtelMetricsIntervalInSeconds(), TimeUnit.SECONDS)
.build());
}
if (metricsConfig.exportOtelMetricsToLog()) {
// internal to test: Disabled by default
builder.registerMetricReader(PeriodicMetricReader.builder(new LogBasedMetricExporter(metricsConfig)).build());
builder.registerMetricReader(
PeriodicMetricReader.builder(new LogBasedMetricExporter(metricsConfig))
.setInterval(metricsConfig.getExportOtelMetricsIntervalInSeconds(), TimeUnit.SECONDS)
.build());
}

if (metricsConfig.useOtelExponentialHistogram()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* to account for all healthy requests. This dimensions makes it easier to make Venice specific aggregations.
*/
public enum VeniceResponseStatusCategory {
HEALTHY, UNHEALTHY, TARDY, THROTTLED, BAD_REQUEST;
SUCCESS, FAIL;
ZacAttack marked this conversation as resolved.
Show resolved Hide resolved

private final String category;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,57 +6,48 @@
import io.opentelemetry.api.metrics.LongCounter;
import io.tehuti.metrics.MeasurableStat;
import io.tehuti.metrics.Sensor;
import io.tehuti.utils.RedundantLogFilter;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


/**
* Operational state of a metric. It holds <br>
* 1. {@link MetricEntity}
* 2. 1 Otel Instrument and
* 3. multiple tehuti Sensors for this Otel Metric
* Operational state of a metric. It holds: <br>
* 1. A {@link MetricEntity} <br>
* 2. One OpenTelemetry (Otel) Instrument <br>
* 3. Zero or one (out of zero for new metrics or more for existing metrics) Tehuti sensors for this Otel Metric. <br>
*
* One Otel instrument can cover multiple Tehuti sensors through the use of dimensions. Ideally, this class should represent a one-to-many
* mapping between an Otel instrument and Tehuti sensors. However, to simplify lookup during runtime, this class holds one Otel instrument
* and one Tehuti sensor. If an Otel instrument corresponds to multiple Tehuti sensors, there will be multiple {@link MetricEntityState}
* objects, each containing the same Otel instrument but different Tehuti sensors.
*/
public class MetricEntityState {
private static final Logger LOGGER = LogManager.getLogger(MetricEntityState.class);
private static final RedundantLogFilter REDUNDANT_LOG_FILTER = RedundantLogFilter.getRedundantLogFilter();
private final MetricEntity metricEntity;
/** Otel metric */
private Object otelMetric = null;
/** Map of tehuti names and sensors: 1 Otel metric can cover multiple Tehuti sensors */
private Map<TehutiMetricNameEnum, Sensor> tehutiSensors = null;
/** Respective tehuti metric */
private Sensor tehutiSensor = null;

public MetricEntityState(MetricEntity metricEntity, VeniceOpenTelemetryMetricsRepository otelRepository) {
this.metricEntity = metricEntity;
setOtelMetric(otelRepository.createInstrument(this.metricEntity));
this(metricEntity, otelRepository, null, null, Collections.EMPTY_LIST);
}

public MetricEntityState(
MetricEntity metricEntity,
VeniceOpenTelemetryMetricsRepository otelRepository,
TehutiSensorRegistrationFunction registerTehutiSensor,
Map<TehutiMetricNameEnum, List<MeasurableStat>> tehutiMetricInput) {
TehutiSensorRegistrationFunction registerTehutiSensorFn,
TehutiMetricNameEnum tehutiMetricNameEnum,
List<MeasurableStat> tehutiMetricStats) {
this.metricEntity = metricEntity;
createMetric(otelRepository, tehutiMetricInput, registerTehutiSensor);
createMetric(otelRepository, tehutiMetricNameEnum, tehutiMetricStats, registerTehutiSensorFn);
}

public void setOtelMetric(Object otelMetric) {
this.otelMetric = otelMetric;
}

/**
* Add Tehuti {@link Sensor} to tehutiSensors map and throw exception if sensor with same name already exists
*/
public void addTehutiSensors(TehutiMetricNameEnum name, Sensor tehutiSensor) {
if (tehutiSensors == null) {
tehutiSensors = new HashMap<>();
}
if (tehutiSensors.put(name, tehutiSensor) != null) {
throw new IllegalArgumentException("Sensor with name '" + name + "' already exists.");
}
public void setTehutiSensor(Sensor tehutiSensor) {
this.tehutiSensor = tehutiSensor;
}

/**
Expand All @@ -69,18 +60,18 @@ public interface TehutiSensorRegistrationFunction {

public void createMetric(
VeniceOpenTelemetryMetricsRepository otelRepository,
Map<TehutiMetricNameEnum, List<MeasurableStat>> tehutiMetricInput,
TehutiSensorRegistrationFunction registerTehutiSensor) {
TehutiMetricNameEnum tehutiMetricNameEnum,
List<MeasurableStat> tehutiMetricStats,
TehutiSensorRegistrationFunction registerTehutiSensorFn) {
// Otel metric: otelRepository will be null if otel is not enabled
if (otelRepository != null) {
setOtelMetric(otelRepository.createInstrument(this.metricEntity));
}
// tehuti metric
for (Map.Entry<TehutiMetricNameEnum, List<MeasurableStat>> entry: tehutiMetricInput.entrySet()) {
addTehutiSensors(
entry.getKey(),
registerTehutiSensor
.register(entry.getKey().getMetricName(), entry.getValue().toArray(new MeasurableStat[0])));
if (tehutiMetricStats != null && !tehutiMetricStats.isEmpty()) {
setTehutiSensor(
registerTehutiSensorFn
.register(tehutiMetricNameEnum.getMetricName(), tehutiMetricStats.toArray(new MeasurableStat[0])));
}
}

Expand All @@ -105,38 +96,29 @@ void recordOtelMetric(double value, Attributes otelDimensions) {
}
}

void recordTehutiMetric(TehutiMetricNameEnum tehutiMetricNameEnum, double value) {
if (tehutiSensors != null) {
Sensor sensor = tehutiSensors.get(tehutiMetricNameEnum);
if (sensor != null) {
sensor.record(value);
} else {
// Log using Redundant log filters to catch any bad tehutiMetricNameEnum is passed in
String errorLog = "Tehuti Sensor with name '" + tehutiMetricNameEnum + "' not found.";
if (!REDUNDANT_LOG_FILTER.isRedundantLog(errorLog)) {
LOGGER.error(errorLog);
}
}
void recordTehutiMetric(double value) {
if (tehutiSensor != null) {
tehutiSensor.record(value);
}
}

public void record(TehutiMetricNameEnum tehutiMetricNameEnum, long value, Attributes otelDimensions) {
public void record(long value, Attributes otelDimensions) {
recordOtelMetric(value, otelDimensions);
recordTehutiMetric(tehutiMetricNameEnum, value);
recordTehutiMetric(value);
}

public void record(TehutiMetricNameEnum tehutiMetricNameEnum, double value, Attributes otelDimensions) {
public void record(double value, Attributes otelDimensions) {
recordOtelMetric(value, otelDimensions);
recordTehutiMetric(tehutiMetricNameEnum, value);
recordTehutiMetric(value);
}

/** used only for testing */
Map<TehutiMetricNameEnum, Sensor> getTehutiSensors() {
return tehutiSensors;
Sensor getTehutiSensor() {
return tehutiSensor;
}

/** used only for testing */
static RedundantLogFilter getRedundantLogFilter() {
return REDUNDANT_LOG_FILTER;
Object getOtelMetric() {
return otelMetric;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public void testDefaultValuesWithBasicConfig() {
assertEquals(config.getMetricPrefix(), "service");
assertFalse(config.emitOtelMetrics());
assertFalse(config.exportOtelMetricsToEndpoint());
assertEquals(config.getExportOtelMetricsIntervalInSeconds(), 60);
assertEquals(config.getOtelExportProtocol(), OtlpConfigUtil.PROTOCOL_HTTP_PROTOBUF);
assertEquals(config.getOtelEndpoint(), null);
assertTrue(config.getOtelHeaders().isEmpty());
Expand All @@ -63,13 +64,15 @@ public void testCustomValues() {
.setMetricPrefix("TestPrefix")
.setTehutiMetricConfig(metricConfig)
.extractAndSetOtelConfigs(otelConfigs)
.setExportOtelMetricsIntervalInSeconds(10)
.build();

assertEquals(config.getServiceName(), "TestService");
assertEquals(config.getMetricPrefix(), "TestPrefix");
assertTrue(config.emitOtelMetrics());
assertTrue(config.exportOtelMetricsToLog());
assertEquals(config.getTehutiMetricConfig(), metricConfig);
assertEquals(config.getExportOtelMetricsIntervalInSeconds(), 10);
}

@Test(expectedExceptions = IllegalArgumentException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public void setUp() {
when(mockMetricsConfig.getServiceName()).thenReturn("test_service");
when(mockMetricsConfig.exportOtelMetricsToEndpoint()).thenReturn(true);
when(mockMetricsConfig.getOtelEndpoint()).thenReturn("http://localhost:4318");
when(mockMetricsConfig.getExportOtelMetricsIntervalInSeconds()).thenReturn(60);

metricsRepository = new VeniceOpenTelemetryMetricsRepository(mockMetricsConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,11 @@ public class VeniceResponseStatusCategoryTest {
public void testVeniceResponseStatusCategory() {
for (VeniceResponseStatusCategory responseStatusCategory: VeniceResponseStatusCategory.values()) {
switch (responseStatusCategory) {
case HEALTHY:
assertEquals(responseStatusCategory.getCategory(), "healthy");
case SUCCESS:
assertEquals(responseStatusCategory.getCategory(), "success");
break;
case UNHEALTHY:
assertEquals(responseStatusCategory.getCategory(), "unhealthy");
break;
case TARDY:
assertEquals(responseStatusCategory.getCategory(), "tardy");
break;
case THROTTLED:
assertEquals(responseStatusCategory.getCategory(), "throttled");
break;
case BAD_REQUEST:
assertEquals(responseStatusCategory.getCategory(), "bad_request");
case FAIL:
assertEquals(responseStatusCategory.getCategory(), "fail");
break;
default:
throw new IllegalArgumentException("Unknown response status category: " + responseStatusCategory);
Expand Down
Loading
Loading