Skip to content

Commit

Permalink
APM Metering API (elastic#99832)
Browse files Browse the repository at this point in the history
Adds Metering instrument interfaces and adapter implementations for opentelemetry instrument types:
* Gauge - a single number that can go up or down
* Histogram - bucketed samples
* Counter - monotonically increasing summed value
* UpDownCounter - summed value that may decrease

Supports both Long* and Double* versions of the instruments.

Instruments can be registered and retrieved by name through APMMeter which is available via the APMTelemetryProvider.

The metering provider starts as the open telemetry noop provider.

`telemetry.metrics.enabled` turns on metering.
  • Loading branch information
stu-elastic authored and piergm committed Oct 2, 2023
1 parent 56706d0 commit c1fa624
Show file tree
Hide file tree
Showing 33 changed files with 1,811 additions and 5 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/99832.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 99832
summary: APM Metering API
area: Infra/Core
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.apm.internal.APMAgentSettings;
import org.elasticsearch.telemetry.apm.internal.APMTelemetryProvider;
import org.elasticsearch.telemetry.apm.internal.metrics.APMMeter;
import org.elasticsearch.telemetry.apm.internal.tracing.APMTracer;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
Expand Down Expand Up @@ -97,13 +98,16 @@ public Collection<Object> createComponents(
apmAgentSettings.syncAgentSystemProperties(settings);
apmAgentSettings.addClusterSettingsListeners(clusterService, telemetryProvider.get());

return List.of(apmTracer);
final APMMeter apmMeter = telemetryProvider.get().getMeter();

return List.of(apmTracer, apmMeter);
}

@Override
public List<Setting<?>> getSettings() {
return List.of(
APMAgentSettings.APM_ENABLED_SETTING,
APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING,
APMAgentSettings.APM_TRACING_NAMES_INCLUDE_SETTING,
APMAgentSettings.APM_TRACING_NAMES_EXCLUDE_SETTING,
APMAgentSettings.APM_TRACING_SANITIZE_FIELD_NAMES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.telemetry.apm.internal.metrics.APMMeter;
import org.elasticsearch.telemetry.apm.internal.tracing.APMTracer;

import java.security.AccessController;
Expand All @@ -40,14 +41,24 @@ public class APMAgentSettings {
* Sensible defaults that Elasticsearch configures. This cannot be done via the APM agent
* config file, as then their values could not be overridden dynamically via system properties.
*/
static Map<String, String> APM_AGENT_DEFAULT_SETTINGS = Map.of("transaction_sample_rate", "0.2");
static Map<String, String> APM_AGENT_DEFAULT_SETTINGS = Map.of(
"transaction_sample_rate",
"0.2",
"enable_experimental_instrumentations",
"true"
);

public void addClusterSettingsListeners(ClusterService clusterService, APMTelemetryProvider apmTelemetryProvider) {
final ClusterSettings clusterSettings = clusterService.getClusterSettings();
final APMTracer apmTracer = apmTelemetryProvider.getTracer();
final APMMeter apmMeter = apmTelemetryProvider.getMeter();

clusterSettings.addSettingsUpdateConsumer(APM_ENABLED_SETTING, enabled -> {
apmTracer.setEnabled(enabled);
this.setAgentSetting("instrument", Boolean.toString(enabled));
});
clusterSettings.addSettingsUpdateConsumer(TELEMETRY_METRICS_ENABLED_SETTING, enabled -> {
apmMeter.setEnabled(enabled);
// The agent records data other than spans, e.g. JVM metrics, so we toggle this setting in order to
// minimise its impact to a running Elasticsearch.
this.setAgentSetting("recording", Boolean.toString(enabled));
Expand Down Expand Up @@ -106,8 +117,10 @@ public void setAgentSetting(String key, String value) {
private static final List<String> PROHIBITED_AGENT_KEYS = List.of(
// ES generates a config file and sets this value
"config_file",
// ES controls this via `tracing.apm.enabled`
"recording"
// ES controls this via `telemetry.metrics.enabled`
"recording",
// ES controls this via `apm.enabled`
"instrument"
);

public static final Setting.AffixSetting<String> APM_AGENT_SETTINGS = Setting.prefixKeySetting(
Expand Down Expand Up @@ -164,6 +177,13 @@ public void setAgentSetting(String key, String value) {
NodeScope
);

public static final Setting<Boolean> TELEMETRY_METRICS_ENABLED_SETTING = Setting.boolSetting(
"telemetry.metrics.enabled",
false,
OperatorDynamic,
NodeScope
);

public static final Setting<SecureString> APM_SECRET_TOKEN_SETTING = SecureSetting.secureString(
APM_SETTING_PREFIX + "secret_token",
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,27 @@

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.apm.internal.metrics.APMMeter;
import org.elasticsearch.telemetry.apm.internal.tracing.APMTracer;

public class APMTelemetryProvider implements TelemetryProvider {
private final Settings settings;
private final APMTracer apmTracer;
private final APMMeter apmMeter;

public APMTelemetryProvider(Settings settings) {
this.settings = settings;
apmTracer = new APMTracer(settings);
apmMeter = new APMMeter(settings);
}

@Override
public APMTracer getTracer() {
return apmTracer;
}

@Override
public APMMeter getMeter() {
return apmMeter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.telemetry.apm.internal.metrics;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.Meter;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.telemetry.apm.internal.APMTelemetryProvider;
import org.elasticsearch.telemetry.metric.DoubleCounter;
import org.elasticsearch.telemetry.metric.DoubleGauge;
import org.elasticsearch.telemetry.metric.DoubleHistogram;
import org.elasticsearch.telemetry.metric.DoubleUpDownCounter;
import org.elasticsearch.telemetry.metric.LongCounter;
import org.elasticsearch.telemetry.metric.LongGauge;
import org.elasticsearch.telemetry.metric.LongHistogram;
import org.elasticsearch.telemetry.metric.LongUpDownCounter;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.function.Supplier;

import static org.elasticsearch.telemetry.apm.internal.APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING;

public class APMMeter extends AbstractLifecycleComponent implements org.elasticsearch.telemetry.metric.Meter {
private final Instruments instruments;

private final Supplier<Meter> otelMeterSupplier;
private final Supplier<Meter> noopMeterSupplier;

private volatile boolean enabled;

public APMMeter(Settings settings) {
this(settings, APMMeter.otelMeter(), APMMeter.noopMeter());
}

public APMMeter(Settings settings, Supplier<Meter> otelMeterSupplier, Supplier<Meter> noopMeterSupplier) {
this.enabled = TELEMETRY_METRICS_ENABLED_SETTING.get(settings);
this.otelMeterSupplier = otelMeterSupplier;
this.noopMeterSupplier = noopMeterSupplier;
this.instruments = new Instruments(enabled ? createOtelMeter() : createNoopMeter());
}

/**
* @see org.elasticsearch.telemetry.apm.internal.APMAgentSettings#addClusterSettingsListeners(ClusterService, APMTelemetryProvider)
*/
public void setEnabled(boolean enabled) {
this.enabled = enabled;
if (enabled) {
instruments.setProvider(createOtelMeter());
} else {
instruments.setProvider(createNoopMeter());
}
}

@Override
protected void doStart() {}

@Override
protected void doStop() {
instruments.setProvider(createNoopMeter());
}

@Override
protected void doClose() {}

@Override
public DoubleCounter registerDoubleCounter(String name, String description, String unit) {
return instruments.registerDoubleCounter(name, description, unit);
}

@Override
public DoubleCounter getDoubleCounter(String name) {
return instruments.getDoubleCounter(name);
}

@Override
public DoubleUpDownCounter registerDoubleUpDownCounter(String name, String description, String unit) {
return instruments.registerDoubleUpDownCounter(name, description, unit);
}

@Override
public DoubleUpDownCounter getDoubleUpDownCounter(String name) {
return instruments.getDoubleUpDownCounter(name);
}

@Override
public DoubleGauge registerDoubleGauge(String name, String description, String unit) {
return instruments.registerDoubleGauge(name, description, unit);
}

@Override
public DoubleGauge getDoubleGauge(String name) {
return instruments.getDoubleGauge(name);
}

@Override
public DoubleHistogram registerDoubleHistogram(String name, String description, String unit) {
return instruments.registerDoubleHistogram(name, description, unit);
}

@Override
public DoubleHistogram getDoubleHistogram(String name) {
return instruments.getDoubleHistogram(name);
}

@Override
public LongCounter registerLongCounter(String name, String description, String unit) {
return instruments.registerLongCounter(name, description, unit);
}

@Override
public LongCounter getLongCounter(String name) {
return instruments.getLongCounter(name);
}

@Override
public LongUpDownCounter registerLongUpDownCounter(String name, String description, String unit) {
return instruments.registerLongUpDownCounter(name, description, unit);
}

@Override
public LongUpDownCounter getLongUpDownCounter(String name) {
return instruments.getLongUpDownCounter(name);
}

@Override
public LongGauge registerLongGauge(String name, String description, String unit) {
return instruments.registerLongGauge(name, description, unit);
}

@Override
public LongGauge getLongGauge(String name) {
return instruments.getLongGauge(name);
}

@Override
public LongHistogram registerLongHistogram(String name, String description, String unit) {
return instruments.registerLongHistogram(name, description, unit);
}

@Override
public LongHistogram getLongHistogram(String name) {
return instruments.getLongHistogram(name);
}

Meter createOtelMeter() {
assert this.enabled;
return AccessController.doPrivileged((PrivilegedAction<Meter>) otelMeterSupplier::get);
}

private Meter createNoopMeter() {
return noopMeterSupplier.get();
}

private static Supplier<Meter> noopMeter() {
return () -> OpenTelemetry.noop().getMeter("noop");
}

// to be used within doPrivileged block
private static Supplier<Meter> otelMeter() {
var openTelemetry = GlobalOpenTelemetry.get();
var meter = openTelemetry.getMeter("elasticsearch");
return () -> meter;
}

// scope for testing
Instruments getInstruments() {
return instruments;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.telemetry.apm.internal.metrics;

import io.opentelemetry.api.metrics.Meter;

import org.elasticsearch.core.Nullable;
import org.elasticsearch.telemetry.metric.Instrument;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

/**
* An instrument that contains the name, description and unit. The delegate may be replaced when
* the provider is updated.
* Subclasses should implement the builder, which is used on initialization and provider updates.
* @param <T> delegated instrument
*/
public abstract class AbstractInstrument<T> implements Instrument {
private final AtomicReference<T> delegate;
private final String name;
private final String description;
private final String unit;

public AbstractInstrument(Meter meter, String name, String description, String unit) {
this.name = Objects.requireNonNull(name);
this.description = Objects.requireNonNull(description);
this.unit = Objects.requireNonNull(unit);
this.delegate = new AtomicReference<>(doBuildInstrument(meter));
}

private T doBuildInstrument(Meter meter) {
return AccessController.doPrivileged((PrivilegedAction<T>) () -> buildInstrument(meter));
}

@Override
public String getName() {
return name;
}

public String getUnit() {
return unit.toString();
}

T getInstrument() {
return delegate.get();
}

String getDescription() {
return description;
}

void setProvider(@Nullable Meter meter) {
delegate.set(doBuildInstrument(Objects.requireNonNull(meter)));
}

abstract T buildInstrument(Meter meter);
}
Loading

0 comments on commit c1fa624

Please sign in to comment.