From f99a80f42d4c5eff8a539ffca99a865e9e2dffe6 Mon Sep 17 00:00:00 2001 From: Chris Cressman Date: Thu, 28 Sep 2023 17:32:37 -0400 Subject: [PATCH 1/3] [DOCS] Create URLs for docs migrating from Enterprise Search (#100032) Several docs are going to migrate from Enterprise Search to Elasticsearch. Create the new URLs without yet placing the pages into the Elasticsearch navigation. This will allow us to handle redirects for Kibana, docs, and the web without waiting for additional content design decisions. The new URLs will be: - https://www.elastic.co/guide/en/elasticsearch/reference/master/ingest-pipeline-search.html - https://www.elastic.co/guide/en/elasticsearch/reference/master/ingest-pipeline-search-inference.html - https://www.elastic.co/guide/en/elasticsearch/reference/master/ingest-pipeline-search-inference.html#ingest-pipeline-search-inference-update-mapping - https://www.elastic.co/guide/en/elasticsearch/reference/master/nlp-example.html - https://www.elastic.co/guide/en/elasticsearch/reference/master/behavioral-analytics-overview.html - https://www.elastic.co/guide/en/elasticsearch/reference/master/behavioral-analytics-start.html - https://www.elastic.co/guide/en/elasticsearch/reference/master/behavioral-analytics-api.html - https://www.elastic.co/guide/en/elasticsearch/reference/master/behavioral-analytics-event.html - https://www.elastic.co/guide/en/elasticsearch/reference/master/behavioral-analytics-event-reference.html - https://www.elastic.co/guide/en/elasticsearch/reference/master/behavioral-analytics-cors.html - https://www.elastic.co/guide/en/elasticsearch/reference/master/search-application-overview.html - https://www.elastic.co/guide/en/elasticsearch/reference/master/search-application-api.html - https://www.elastic.co/guide/en/elasticsearch/reference/master/search-application-client.html - https://www.elastic.co/guide/en/elasticsearch/reference/master/search-application-security.html --- docs/reference/redirects.asciidoc | 70 +++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/docs/reference/redirects.asciidoc b/docs/reference/redirects.asciidoc index 8e1023c47b929..4ec8c203bbef9 100644 --- a/docs/reference/redirects.asciidoc +++ b/docs/reference/redirects.asciidoc @@ -1932,3 +1932,73 @@ Refer to <>. === Configure roles and users for remote clusters Refer to <>. + +[role="exclude",id="ingest-pipeline-search"] +=== Ingest pipelines for Search indices + +coming::[8.11.0] + +[role="exclude",id="ingest-pipeline-search-inference"] +=== Inference processing for Search indices + +coming::[8.11.0] + +[id="ingest-pipeline-search-inference-update-mapping"] +==== Update mapping + +coming::[8.11.0] + +[role="exclude",id="nlp-example"] +=== Tutorial: Natural language processing (NLP) + +coming::[8.11.0] + +[role="exclude",id="behavioral-analytics-overview"] +=== Elastic Behavioral Analytics + +coming::[8.11.0] + +[role="exclude",id="behavioral-analytics-start"] +=== Get started with Behavioral Analytics + +coming::[8.11.0] + +[role="exclude",id="behavioral-analytics-api"] +=== Behavioral Analytics APIs + +coming::[8.11.0] + +[role="exclude",id="behavioral-analytics-event"] +=== View Behavioral Analytics Events + +coming::[8.11.0] + +[role="exclude",id="behavioral-analytics-event-reference"] +=== Behavioral Analytics events reference + +coming::[8.11.0] + +[role="exclude",id="behavioral-analytics-cors"] +=== Set up CORS for Behavioral Analytics + +coming::[8.11.0] + +[role="exclude",id="search-application-overview"] +=== Elastic Search Applications + +coming::[8.11.0] + +[role="exclude",id="search-application-api"] +=== Search Applications search API and templates + +coming::[8.11.0] + +[role="exclude",id="search-application-client"] +=== Search Applications client + +coming::[8.11.0] + +[role="exclude",id="search-application-security"] +=== Search Applications security + +coming::[8.11.0] From f8d09e9c6cf4b611de3dc7509b0aab6e6a1d5d7a Mon Sep 17 00:00:00 2001 From: Stuart Tettemer Date: Thu, 28 Sep 2023 19:35:46 -0500 Subject: [PATCH 2/3] APM Metering API (#99832) Adds Metering instrument interfaces and adapter implementations for opentelemetry instrument types: * Gauge - a single number that can go up or down * Histogram - bucketed samples * Counter - monotonically increasing summed value * UpDownCounter - summed value that may decrease Supports both Long* and Double* versions of the instruments. Instruments can be registered and retrieved by name through APMMeter which is available via the APMTelemetryProvider. The metering provider starts as the open telemetry noop provider. `telemetry.metrics.enabled` turns on metering. --- docs/changelog/99832.yaml | 5 + .../org/elasticsearch/telemetry/apm/APM.java | 6 +- .../apm/internal/APMAgentSettings.java | 26 +- .../apm/internal/APMTelemetryProvider.java | 8 + .../apm/internal/metrics/APMMeter.java | 180 ++++++++++++++ .../internal/metrics/AbstractInstrument.java | 66 +++++ .../metrics/DoubleCounterAdapter.java | 52 ++++ .../internal/metrics/DoubleGaugeAdapter.java | 42 ++++ .../metrics/DoubleHistogramAdapter.java | 42 ++++ .../metrics/DoubleUpDownCounterAdapter.java | 46 ++++ .../apm/internal/metrics/Instruments.java | 184 ++++++++++++++ .../internal/metrics/LongCounterAdapter.java | 49 ++++ .../internal/metrics/LongGaugeAdapter.java | 46 ++++ .../metrics/LongHistogramAdapter.java | 46 ++++ .../metrics/LongUpDownCounterAdapter.java | 42 ++++ .../apm/internal/metrics/OtelHelper.java | 36 +++ .../apm/internal/tracing/APMTracer.java | 2 +- .../plugin-metadata/plugin-security.policy | 2 + .../apm/internal/metrics/APMMeterTests.java | 85 +++++++ .../metrics/InstrumentsConcurrencyTests.java | 112 +++++++++ .../internal/metrics/InstrumentsTests.java | 77 ++++++ server/src/main/java/module-info.java | 1 + .../telemetry/TelemetryProvider.java | 8 + .../telemetry/metric/DoubleCounter.java | 60 +++++ .../telemetry/metric/DoubleGauge.java | 47 ++++ .../telemetry/metric/DoubleHistogram.java | 48 ++++ .../telemetry/metric/DoubleUpDownCounter.java | 50 ++++ .../telemetry/metric/Instrument.java | 13 + .../telemetry/metric/LongCounter.java | 60 +++++ .../telemetry/metric/LongGauge.java | 49 ++++ .../telemetry/metric/LongHistogram.java | 48 ++++ .../telemetry/metric/LongUpDownCounter.java | 50 ++++ .../elasticsearch/telemetry/metric/Meter.java | 228 ++++++++++++++++++ 33 files changed, 1811 insertions(+), 5 deletions(-) create mode 100644 docs/changelog/99832.yaml create mode 100644 modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/APMMeter.java create mode 100644 modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/AbstractInstrument.java create mode 100644 modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleCounterAdapter.java create mode 100644 modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java create mode 100644 modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleHistogramAdapter.java create mode 100644 modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleUpDownCounterAdapter.java create mode 100644 modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/Instruments.java create mode 100644 modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongCounterAdapter.java create mode 100644 modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java create mode 100644 modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongHistogramAdapter.java create mode 100644 modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongUpDownCounterAdapter.java create mode 100644 modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/OtelHelper.java create mode 100644 modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/APMMeterTests.java create mode 100644 modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/InstrumentsConcurrencyTests.java create mode 100644 modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/InstrumentsTests.java create mode 100644 server/src/main/java/org/elasticsearch/telemetry/metric/DoubleCounter.java create mode 100644 server/src/main/java/org/elasticsearch/telemetry/metric/DoubleGauge.java create mode 100644 server/src/main/java/org/elasticsearch/telemetry/metric/DoubleHistogram.java create mode 100644 server/src/main/java/org/elasticsearch/telemetry/metric/DoubleUpDownCounter.java create mode 100644 server/src/main/java/org/elasticsearch/telemetry/metric/Instrument.java create mode 100644 server/src/main/java/org/elasticsearch/telemetry/metric/LongCounter.java create mode 100644 server/src/main/java/org/elasticsearch/telemetry/metric/LongGauge.java create mode 100644 server/src/main/java/org/elasticsearch/telemetry/metric/LongHistogram.java create mode 100644 server/src/main/java/org/elasticsearch/telemetry/metric/LongUpDownCounter.java create mode 100644 server/src/main/java/org/elasticsearch/telemetry/metric/Meter.java diff --git a/docs/changelog/99832.yaml b/docs/changelog/99832.yaml new file mode 100644 index 0000000000000..9bd83591ba920 --- /dev/null +++ b/docs/changelog/99832.yaml @@ -0,0 +1,5 @@ +pr: 99832 +summary: APM Metering API +area: Infra/Core +type: enhancement +issues: [] diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APM.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APM.java index be59eda4a63c2..935c4958ba3d7 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APM.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APM.java @@ -27,6 +27,7 @@ import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.telemetry.apm.internal.APMAgentSettings; import org.elasticsearch.telemetry.apm.internal.APMTelemetryProvider; +import org.elasticsearch.telemetry.apm.internal.metrics.APMMeter; import org.elasticsearch.telemetry.apm.internal.tracing.APMTracer; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; @@ -97,13 +98,16 @@ public Collection createComponents( apmAgentSettings.syncAgentSystemProperties(settings); apmAgentSettings.addClusterSettingsListeners(clusterService, telemetryProvider.get()); - return List.of(apmTracer); + final APMMeter apmMeter = telemetryProvider.get().getMeter(); + + return List.of(apmTracer, apmMeter); } @Override public List> getSettings() { return List.of( APMAgentSettings.APM_ENABLED_SETTING, + APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING, APMAgentSettings.APM_TRACING_NAMES_INCLUDE_SETTING, APMAgentSettings.APM_TRACING_NAMES_EXCLUDE_SETTING, APMAgentSettings.APM_TRACING_SANITIZE_FIELD_NAMES, diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMAgentSettings.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMAgentSettings.java index 75ca94bb13ad6..e4a194ebe0172 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMAgentSettings.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMAgentSettings.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.telemetry.apm.internal.metrics.APMMeter; import org.elasticsearch.telemetry.apm.internal.tracing.APMTracer; import java.security.AccessController; @@ -40,14 +41,24 @@ public class APMAgentSettings { * Sensible defaults that Elasticsearch configures. This cannot be done via the APM agent * config file, as then their values could not be overridden dynamically via system properties. */ - static Map APM_AGENT_DEFAULT_SETTINGS = Map.of("transaction_sample_rate", "0.2"); + static Map APM_AGENT_DEFAULT_SETTINGS = Map.of( + "transaction_sample_rate", + "0.2", + "enable_experimental_instrumentations", + "true" + ); public void addClusterSettingsListeners(ClusterService clusterService, APMTelemetryProvider apmTelemetryProvider) { final ClusterSettings clusterSettings = clusterService.getClusterSettings(); final APMTracer apmTracer = apmTelemetryProvider.getTracer(); + final APMMeter apmMeter = apmTelemetryProvider.getMeter(); clusterSettings.addSettingsUpdateConsumer(APM_ENABLED_SETTING, enabled -> { apmTracer.setEnabled(enabled); + this.setAgentSetting("instrument", Boolean.toString(enabled)); + }); + clusterSettings.addSettingsUpdateConsumer(TELEMETRY_METRICS_ENABLED_SETTING, enabled -> { + apmMeter.setEnabled(enabled); // The agent records data other than spans, e.g. JVM metrics, so we toggle this setting in order to // minimise its impact to a running Elasticsearch. this.setAgentSetting("recording", Boolean.toString(enabled)); @@ -106,8 +117,10 @@ public void setAgentSetting(String key, String value) { private static final List PROHIBITED_AGENT_KEYS = List.of( // ES generates a config file and sets this value "config_file", - // ES controls this via `tracing.apm.enabled` - "recording" + // ES controls this via `telemetry.metrics.enabled` + "recording", + // ES controls this via `apm.enabled` + "instrument" ); public static final Setting.AffixSetting APM_AGENT_SETTINGS = Setting.prefixKeySetting( @@ -164,6 +177,13 @@ public void setAgentSetting(String key, String value) { NodeScope ); + public static final Setting TELEMETRY_METRICS_ENABLED_SETTING = Setting.boolSetting( + "telemetry.metrics.enabled", + false, + OperatorDynamic, + NodeScope + ); + public static final Setting APM_SECRET_TOKEN_SETTING = SecureSetting.secureString( APM_SETTING_PREFIX + "secret_token", null diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMTelemetryProvider.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMTelemetryProvider.java index 495afd43bf176..ae9d91cc6ec51 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMTelemetryProvider.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMTelemetryProvider.java @@ -10,19 +10,27 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.telemetry.TelemetryProvider; +import org.elasticsearch.telemetry.apm.internal.metrics.APMMeter; import org.elasticsearch.telemetry.apm.internal.tracing.APMTracer; public class APMTelemetryProvider implements TelemetryProvider { private final Settings settings; private final APMTracer apmTracer; + private final APMMeter apmMeter; public APMTelemetryProvider(Settings settings) { this.settings = settings; apmTracer = new APMTracer(settings); + apmMeter = new APMMeter(settings); } @Override public APMTracer getTracer() { return apmTracer; } + + @Override + public APMMeter getMeter() { + return apmMeter; + } } diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/APMMeter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/APMMeter.java new file mode 100644 index 0000000000000..0a8d425579ca2 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/APMMeter.java @@ -0,0 +1,180 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.metrics.Meter; + +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.telemetry.apm.internal.APMTelemetryProvider; +import org.elasticsearch.telemetry.metric.DoubleCounter; +import org.elasticsearch.telemetry.metric.DoubleGauge; +import org.elasticsearch.telemetry.metric.DoubleHistogram; +import org.elasticsearch.telemetry.metric.DoubleUpDownCounter; +import org.elasticsearch.telemetry.metric.LongCounter; +import org.elasticsearch.telemetry.metric.LongGauge; +import org.elasticsearch.telemetry.metric.LongHistogram; +import org.elasticsearch.telemetry.metric.LongUpDownCounter; + +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.function.Supplier; + +import static org.elasticsearch.telemetry.apm.internal.APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING; + +public class APMMeter extends AbstractLifecycleComponent implements org.elasticsearch.telemetry.metric.Meter { + private final Instruments instruments; + + private final Supplier otelMeterSupplier; + private final Supplier noopMeterSupplier; + + private volatile boolean enabled; + + public APMMeter(Settings settings) { + this(settings, APMMeter.otelMeter(), APMMeter.noopMeter()); + } + + public APMMeter(Settings settings, Supplier otelMeterSupplier, Supplier noopMeterSupplier) { + this.enabled = TELEMETRY_METRICS_ENABLED_SETTING.get(settings); + this.otelMeterSupplier = otelMeterSupplier; + this.noopMeterSupplier = noopMeterSupplier; + this.instruments = new Instruments(enabled ? createOtelMeter() : createNoopMeter()); + } + + /** + * @see org.elasticsearch.telemetry.apm.internal.APMAgentSettings#addClusterSettingsListeners(ClusterService, APMTelemetryProvider) + */ + public void setEnabled(boolean enabled) { + this.enabled = enabled; + if (enabled) { + instruments.setProvider(createOtelMeter()); + } else { + instruments.setProvider(createNoopMeter()); + } + } + + @Override + protected void doStart() {} + + @Override + protected void doStop() { + instruments.setProvider(createNoopMeter()); + } + + @Override + protected void doClose() {} + + @Override + public DoubleCounter registerDoubleCounter(String name, String description, String unit) { + return instruments.registerDoubleCounter(name, description, unit); + } + + @Override + public DoubleCounter getDoubleCounter(String name) { + return instruments.getDoubleCounter(name); + } + + @Override + public DoubleUpDownCounter registerDoubleUpDownCounter(String name, String description, String unit) { + return instruments.registerDoubleUpDownCounter(name, description, unit); + } + + @Override + public DoubleUpDownCounter getDoubleUpDownCounter(String name) { + return instruments.getDoubleUpDownCounter(name); + } + + @Override + public DoubleGauge registerDoubleGauge(String name, String description, String unit) { + return instruments.registerDoubleGauge(name, description, unit); + } + + @Override + public DoubleGauge getDoubleGauge(String name) { + return instruments.getDoubleGauge(name); + } + + @Override + public DoubleHistogram registerDoubleHistogram(String name, String description, String unit) { + return instruments.registerDoubleHistogram(name, description, unit); + } + + @Override + public DoubleHistogram getDoubleHistogram(String name) { + return instruments.getDoubleHistogram(name); + } + + @Override + public LongCounter registerLongCounter(String name, String description, String unit) { + return instruments.registerLongCounter(name, description, unit); + } + + @Override + public LongCounter getLongCounter(String name) { + return instruments.getLongCounter(name); + } + + @Override + public LongUpDownCounter registerLongUpDownCounter(String name, String description, String unit) { + return instruments.registerLongUpDownCounter(name, description, unit); + } + + @Override + public LongUpDownCounter getLongUpDownCounter(String name) { + return instruments.getLongUpDownCounter(name); + } + + @Override + public LongGauge registerLongGauge(String name, String description, String unit) { + return instruments.registerLongGauge(name, description, unit); + } + + @Override + public LongGauge getLongGauge(String name) { + return instruments.getLongGauge(name); + } + + @Override + public LongHistogram registerLongHistogram(String name, String description, String unit) { + return instruments.registerLongHistogram(name, description, unit); + } + + @Override + public LongHistogram getLongHistogram(String name) { + return instruments.getLongHistogram(name); + } + + Meter createOtelMeter() { + assert this.enabled; + return AccessController.doPrivileged((PrivilegedAction) otelMeterSupplier::get); + } + + private Meter createNoopMeter() { + return noopMeterSupplier.get(); + } + + private static Supplier noopMeter() { + return () -> OpenTelemetry.noop().getMeter("noop"); + } + + // to be used within doPrivileged block + private static Supplier otelMeter() { + var openTelemetry = GlobalOpenTelemetry.get(); + var meter = openTelemetry.getMeter("elasticsearch"); + return () -> meter; + } + + // scope for testing + Instruments getInstruments() { + return instruments; + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/AbstractInstrument.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/AbstractInstrument.java new file mode 100644 index 0000000000000..d3d485f52bc49 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/AbstractInstrument.java @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import org.elasticsearch.core.Nullable; +import org.elasticsearch.telemetry.metric.Instrument; + +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +/** + * An instrument that contains the name, description and unit. The delegate may be replaced when + * the provider is updated. + * Subclasses should implement the builder, which is used on initialization and provider updates. + * @param delegated instrument + */ +public abstract class AbstractInstrument implements Instrument { + private final AtomicReference delegate; + private final String name; + private final String description; + private final String unit; + + public AbstractInstrument(Meter meter, String name, String description, String unit) { + this.name = Objects.requireNonNull(name); + this.description = Objects.requireNonNull(description); + this.unit = Objects.requireNonNull(unit); + this.delegate = new AtomicReference<>(doBuildInstrument(meter)); + } + + private T doBuildInstrument(Meter meter) { + return AccessController.doPrivileged((PrivilegedAction) () -> buildInstrument(meter)); + } + + @Override + public String getName() { + return name; + } + + public String getUnit() { + return unit.toString(); + } + + T getInstrument() { + return delegate.get(); + } + + String getDescription() { + return description; + } + + void setProvider(@Nullable Meter meter) { + delegate.set(doBuildInstrument(Objects.requireNonNull(meter))); + } + + abstract T buildInstrument(Meter meter); +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleCounterAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleCounterAdapter.java new file mode 100644 index 0000000000000..b25ffdff5481b --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleCounterAdapter.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import java.util.Map; +import java.util.Objects; + +/** + * DoubleGaugeAdapter wraps an otel ObservableDoubleMeasurement + */ +public class DoubleCounterAdapter extends AbstractInstrument + implements + org.elasticsearch.telemetry.metric.DoubleCounter { + + public DoubleCounterAdapter(Meter meter, String name, String description, String unit) { + super(meter, name, description, unit); + } + + io.opentelemetry.api.metrics.DoubleCounter buildInstrument(Meter meter) { + return Objects.requireNonNull(meter) + .counterBuilder(getName()) + .ofDoubles() + .setDescription(getDescription()) + .setUnit(getUnit()) + .build(); + } + + @Override + public void increment() { + getInstrument().add(1d); + } + + @Override + public void incrementBy(double inc) { + assert inc >= 0; + getInstrument().add(inc); + } + + @Override + public void incrementBy(double inc, Map attributes) { + assert inc >= 0; + getInstrument().add(inc, OtelHelper.fromMap(attributes)); + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java new file mode 100644 index 0000000000000..9d55d475d4a93 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import java.util.Map; +import java.util.Objects; + +/** + * DoubleGaugeAdapter wraps an otel ObservableDoubleMeasurement + */ +public class DoubleGaugeAdapter extends AbstractInstrument + implements + org.elasticsearch.telemetry.metric.DoubleGauge { + + public DoubleGaugeAdapter(Meter meter, String name, String description, String unit) { + super(meter, name, description, unit); + } + + @Override + io.opentelemetry.api.metrics.ObservableDoubleMeasurement buildInstrument(Meter meter) { + var builder = Objects.requireNonNull(meter).gaugeBuilder(getName()); + return builder.setDescription(getDescription()).setUnit(getUnit()).buildObserver(); + } + + @Override + public void record(double value) { + getInstrument().record(value); + } + + @Override + public void record(double value, Map attributes) { + getInstrument().record(value, OtelHelper.fromMap(attributes)); + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleHistogramAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleHistogramAdapter.java new file mode 100644 index 0000000000000..5fd1a8a189b0f --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleHistogramAdapter.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import java.util.Map; +import java.util.Objects; + +/** + * DoubleHistogramAdapter wraps an otel DoubleHistogram + */ +public class DoubleHistogramAdapter extends AbstractInstrument + implements + org.elasticsearch.telemetry.metric.DoubleHistogram { + + public DoubleHistogramAdapter(Meter meter, String name, String description, String unit) { + super(meter, name, description, unit); + } + + @Override + io.opentelemetry.api.metrics.DoubleHistogram buildInstrument(Meter meter) { + var builder = Objects.requireNonNull(meter).histogramBuilder(getName()); + return builder.setDescription(getDescription()).setUnit(getUnit()).build(); + } + + @Override + public void record(double value) { + getInstrument().record(value); + } + + @Override + public void record(double value, Map attributes) { + getInstrument().record(value, OtelHelper.fromMap(attributes)); + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleUpDownCounterAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleUpDownCounterAdapter.java new file mode 100644 index 0000000000000..9a2fc1b564766 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleUpDownCounterAdapter.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import java.util.Map; +import java.util.Objects; + +/** + * DoubleUpDownCounterAdapter wraps an otel DoubleUpDownCounter + */ +public class DoubleUpDownCounterAdapter extends AbstractInstrument + implements + org.elasticsearch.telemetry.metric.DoubleUpDownCounter { + + public DoubleUpDownCounterAdapter(Meter meter, String name, String description, String unit) { + super(meter, name, description, unit); + } + + @Override + io.opentelemetry.api.metrics.DoubleUpDownCounter buildInstrument(Meter meter) { + return Objects.requireNonNull(meter) + .upDownCounterBuilder(getName()) + .ofDoubles() + .setDescription(getDescription()) + .setUnit(getUnit()) + .build(); + } + + @Override + public void add(double inc) { + getInstrument().add(inc); + } + + @Override + public void add(double inc, Map attributes) { + getInstrument().add(inc, OtelHelper.fromMap(attributes)); + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/Instruments.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/Instruments.java new file mode 100644 index 0000000000000..92d7d692f0ea5 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/Instruments.java @@ -0,0 +1,184 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.telemetry.metric.DoubleCounter; +import org.elasticsearch.telemetry.metric.DoubleGauge; +import org.elasticsearch.telemetry.metric.DoubleHistogram; +import org.elasticsearch.telemetry.metric.DoubleUpDownCounter; +import org.elasticsearch.telemetry.metric.LongCounter; +import org.elasticsearch.telemetry.metric.LongGauge; +import org.elasticsearch.telemetry.metric.LongHistogram; +import org.elasticsearch.telemetry.metric.LongUpDownCounter; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Container for registering and fetching instruments by type and name. + * Instrument names must be unique for a given type on registration. + * {@link #setProvider(Meter)} is used to change the provider for all existing instruments. + */ +public class Instruments { + private final Registrar doubleCounters = new Registrar<>(); + private final Registrar doubleUpDownCounters = new Registrar<>(); + private final Registrar doubleGauges = new Registrar<>(); + private final Registrar doubleHistograms = new Registrar<>(); + private final Registrar longCounters = new Registrar<>(); + private final Registrar longUpDownCounters = new Registrar<>(); + private final Registrar longGauges = new Registrar<>(); + private final Registrar longHistograms = new Registrar<>(); + + private final Meter meter; + + public Instruments(Meter meter) { + this.meter = meter; + } + + private final List> registrars = List.of( + doubleCounters, + doubleUpDownCounters, + doubleGauges, + doubleHistograms, + longCounters, + longUpDownCounters, + longGauges, + longHistograms + ); + + // Access to registration has to be restricted when the provider is updated in ::setProvider + protected final ReleasableLock registerLock = new ReleasableLock(new ReentrantLock()); + + public DoubleCounter registerDoubleCounter(String name, String description, String unit) { + try (ReleasableLock lock = registerLock.acquire()) { + return doubleCounters.register(new DoubleCounterAdapter(meter, name, description, unit)); + } + } + + public DoubleCounter getDoubleCounter(String name) { + return doubleCounters.get(name); + } + + public DoubleUpDownCounter registerDoubleUpDownCounter(String name, String description, String unit) { + try (ReleasableLock lock = registerLock.acquire()) { + return doubleUpDownCounters.register(new DoubleUpDownCounterAdapter(meter, name, description, unit)); + } + } + + public DoubleUpDownCounter getDoubleUpDownCounter(String name) { + return doubleUpDownCounters.get(name); + } + + public DoubleGauge registerDoubleGauge(String name, String description, String unit) { + try (ReleasableLock lock = registerLock.acquire()) { + return doubleGauges.register(new DoubleGaugeAdapter(meter, name, description, unit)); + } + } + + public DoubleGauge getDoubleGauge(String name) { + return doubleGauges.get(name); + } + + public DoubleHistogram registerDoubleHistogram(String name, String description, String unit) { + try (ReleasableLock lock = registerLock.acquire()) { + return doubleHistograms.register(new DoubleHistogramAdapter(meter, name, description, unit)); + } + } + + public DoubleHistogram getDoubleHistogram(String name) { + return doubleHistograms.get(name); + } + + public LongCounter registerLongCounter(String name, String description, String unit) { + try (ReleasableLock lock = registerLock.acquire()) { + return longCounters.register(new LongCounterAdapter(meter, name, description, unit)); + } + } + + public LongCounter getLongCounter(String name) { + return longCounters.get(name); + } + + public LongUpDownCounter registerLongUpDownCounter(String name, String description, String unit) { + try (ReleasableLock lock = registerLock.acquire()) { + return longUpDownCounters.register(new LongUpDownCounterAdapter(meter, name, description, unit)); + } + } + + public LongUpDownCounter getLongUpDownCounter(String name) { + return longUpDownCounters.get(name); + } + + public LongGauge registerLongGauge(String name, String description, String unit) { + try (ReleasableLock lock = registerLock.acquire()) { + return longGauges.register(new LongGaugeAdapter(meter, name, description, unit)); + } + } + + public LongGauge getLongGauge(String name) { + return longGauges.get(name); + } + + public LongHistogram registerLongHistogram(String name, String description, String unit) { + try (ReleasableLock lock = registerLock.acquire()) { + return longHistograms.register(new LongHistogramAdapter(meter, name, description, unit)); + } + } + + public LongHistogram getLongHistogram(String name) { + return longHistograms.get(name); + } + + public void setProvider(Meter meter) { + try (ReleasableLock lock = registerLock.acquire()) { + for (Registrar registrar : registrars) { + registrar.setProvider(meter); + } + } + } + + /** + * A typed wrapper for a instrument that + * @param + */ + private static class Registrar> { + private final Map registered = ConcurrentCollections.newConcurrentMap(); + + T register(T instrument) { + registered.compute(instrument.getName(), (k, v) -> { + if (v != null) { + throw new IllegalStateException( + instrument.getClass().getSimpleName() + "[" + instrument.getName() + "] already registered" + ); + } + + return instrument; + }); + return instrument; + } + + T get(String name) { + return registered.get(name); + } + + void setProvider(Meter meter) { + registered.forEach((k, v) -> v.setProvider(meter)); + } + } + + // scope for testing + Meter getMeter() { + return meter; + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongCounterAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongCounterAdapter.java new file mode 100644 index 0000000000000..122d16d9e1aa4 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongCounterAdapter.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import java.util.Map; +import java.util.Objects; + +/** + * LongCounterAdapter wraps an otel LongCounter + */ +public class LongCounterAdapter extends AbstractInstrument + implements + org.elasticsearch.telemetry.metric.LongCounter { + + public LongCounterAdapter(Meter meter, String name, String description, String unit) { + super(meter, name, description, unit); + } + + @Override + io.opentelemetry.api.metrics.LongCounter buildInstrument(Meter meter) { + var builder = Objects.requireNonNull(meter).counterBuilder(getName()); + return builder.setDescription(getDescription()).setUnit(getUnit()).build(); + } + + @Override + public void increment() { + getInstrument().add(1L); + } + + @Override + public void incrementBy(long inc) { + assert inc >= 0; + getInstrument().add(inc); + } + + @Override + public void incrementBy(long inc, Map attributes) { + assert inc >= 0; + getInstrument().add(inc, OtelHelper.fromMap(attributes)); + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java new file mode 100644 index 0000000000000..48430285a5173 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import java.util.Map; +import java.util.Objects; + +/** + * LongGaugeAdapter wraps an otel ObservableLongMeasurement + */ +public class LongGaugeAdapter extends AbstractInstrument + implements + org.elasticsearch.telemetry.metric.LongGauge { + + public LongGaugeAdapter(Meter meter, String name, String description, String unit) { + super(meter, name, description, unit); + } + + @Override + io.opentelemetry.api.metrics.ObservableLongMeasurement buildInstrument(Meter meter) { + return Objects.requireNonNull(meter) + .gaugeBuilder(getName()) + .ofLongs() + .setDescription(getDescription()) + .setUnit(getUnit()) + .buildObserver(); + } + + @Override + public void record(long value) { + getInstrument().record(value); + } + + @Override + public void record(long value, Map attributes) { + getInstrument().record(value, OtelHelper.fromMap(attributes)); + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongHistogramAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongHistogramAdapter.java new file mode 100644 index 0000000000000..bb5be4866e7b7 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongHistogramAdapter.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import java.util.Map; +import java.util.Objects; + +/** + * LongHistogramAdapter wraps an otel LongHistogram + */ +public class LongHistogramAdapter extends AbstractInstrument + implements + org.elasticsearch.telemetry.metric.LongHistogram { + + public LongHistogramAdapter(Meter meter, String name, String description, String unit) { + super(meter, name, description, unit); + } + + @Override + io.opentelemetry.api.metrics.LongHistogram buildInstrument(Meter meter) { + return Objects.requireNonNull(meter) + .histogramBuilder(getName()) + .ofLongs() + .setDescription(getDescription()) + .setUnit(getUnit()) + .build(); + } + + @Override + public void record(long value) { + getInstrument().record(value); + } + + @Override + public void record(long value, Map attributes) { + getInstrument().record(value, OtelHelper.fromMap(attributes)); + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongUpDownCounterAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongUpDownCounterAdapter.java new file mode 100644 index 0000000000000..e5af85e4ed192 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongUpDownCounterAdapter.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import java.util.Map; +import java.util.Objects; + +/** + * LongUpDownCounterAdapter wraps an otel LongUpDownCounter + */ +public class LongUpDownCounterAdapter extends AbstractInstrument + implements + org.elasticsearch.telemetry.metric.LongUpDownCounter { + + public LongUpDownCounterAdapter(Meter meter, String name, String description, String unit) { + super(meter, name, description, unit); + } + + @Override + io.opentelemetry.api.metrics.LongUpDownCounter buildInstrument(Meter meter) { + var builder = Objects.requireNonNull(meter).upDownCounterBuilder(getName()); + return builder.setDescription(getDescription()).setUnit(getUnit()).build(); + } + + @Override + public void add(long inc) { + getInstrument().add(inc); + } + + @Override + public void add(long inc, Map attributes) { + getInstrument().add(inc, OtelHelper.fromMap(attributes)); + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/OtelHelper.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/OtelHelper.java new file mode 100644 index 0000000000000..673025a1a41f4 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/OtelHelper.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.common.Attributes; + +import java.util.Map; + +class OtelHelper { + static Attributes fromMap(Map attributes) { + if (attributes == null || attributes.isEmpty()) { + return Attributes.empty(); + } + var builder = Attributes.builder(); + attributes.forEach((k, v) -> { + if (v instanceof String value) { + builder.put(k, value); + } else if (v instanceof Long value) { + builder.put(k, value); + } else if (v instanceof Double value) { + builder.put(k, value); + } else if (v instanceof Boolean value) { + builder.put(k, value); + } else { + throw new IllegalArgumentException("attributes do not support value type of [" + v.getClass().getCanonicalName() + "]"); + } + }); + return builder.build(); + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java index daedb90047975..9f9fdb3dc26ef 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java @@ -150,7 +150,6 @@ APMServices createApmServices() { return AccessController.doPrivileged((PrivilegedAction) () -> { var openTelemetry = GlobalOpenTelemetry.get(); var tracer = openTelemetry.getTracer("elasticsearch", Version.CURRENT.toString()); - return new APMServices(tracer, openTelemetry); }); } @@ -452,4 +451,5 @@ private static Automaton patternsToAutomaton(List patterns) { } return Operations.union(automata); } + } diff --git a/modules/apm/src/main/plugin-metadata/plugin-security.policy b/modules/apm/src/main/plugin-metadata/plugin-security.policy index b85d3ec05c277..57da3a2efd301 100644 --- a/modules/apm/src/main/plugin-metadata/plugin-security.policy +++ b/modules/apm/src/main/plugin-metadata/plugin-security.policy @@ -11,6 +11,8 @@ grant { permission java.lang.RuntimePermission "createClassLoader"; permission java.lang.RuntimePermission "getClassLoader"; permission java.util.PropertyPermission "elastic.apm.*", "write"; + permission java.util.PropertyPermission "*", "read,write"; + permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; }; grant codeBase "${codebase.elastic-apm-agent}" { diff --git a/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/APMMeterTests.java b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/APMMeterTests.java new file mode 100644 index 0000000000000..1064b8820b089 --- /dev/null +++ b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/APMMeterTests.java @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.metrics.Meter; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.telemetry.apm.internal.APMAgentSettings; +import org.elasticsearch.telemetry.metric.DoubleCounter; +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.sameInstance; + +public class APMMeterTests extends ESTestCase { + Meter testOtel = OpenTelemetry.noop().getMeter("test"); + + Meter noopOtel = OpenTelemetry.noop().getMeter("noop"); + + public void testMeterIsSetUponConstruction() { + // test default + APMMeter apmMeter = new APMMeter(Settings.EMPTY, () -> testOtel, () -> noopOtel); + + Meter meter = apmMeter.getInstruments().getMeter(); + assertThat(meter, sameInstance(noopOtel)); + + // test explicitly enabled + var settings = Settings.builder().put(APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING.getKey(), true).build(); + apmMeter = new APMMeter(settings, () -> testOtel, () -> noopOtel); + + meter = apmMeter.getInstruments().getMeter(); + assertThat(meter, sameInstance(testOtel)); + + // test explicitly disabled + settings = Settings.builder().put(APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING.getKey(), true).build(); + apmMeter = new APMMeter(settings, () -> testOtel, () -> noopOtel); + + meter = apmMeter.getInstruments().getMeter(); + assertThat(meter, sameInstance(noopOtel)); + } + + public void testMeterIsOverridden() { + APMMeter apmMeter = new APMMeter(Settings.EMPTY, () -> testOtel, () -> noopOtel); + + Meter meter = apmMeter.getInstruments().getMeter(); + assertThat(meter, sameInstance(noopOtel)); + + apmMeter.setEnabled(true); + + meter = apmMeter.getInstruments().getMeter(); + assertThat(meter, sameInstance(testOtel)); + } + + public void testLookupByName() { + var settings = Settings.builder().put(APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING.getKey(), true).build(); + + var apmMeter = new APMMeter(settings, () -> testOtel, () -> noopOtel); + + DoubleCounter registeredCounter = apmMeter.registerDoubleCounter("name", "desc", "unit"); + DoubleCounter lookedUpCounter = apmMeter.getDoubleCounter("name"); + + assertThat(lookedUpCounter, sameInstance(registeredCounter)); + } + + public void testNoopIsSetOnStop() { + var settings = Settings.builder().put(APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING.getKey(), true).build(); + APMMeter apmMeter = new APMMeter(settings, () -> testOtel, () -> noopOtel); + apmMeter.start(); + + Meter meter = apmMeter.getInstruments().getMeter(); + assertThat(meter, sameInstance(testOtel)); + + apmMeter.stop(); + + meter = apmMeter.getInstruments().getMeter(); + assertThat(meter, sameInstance(noopOtel)); + } + +} diff --git a/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/InstrumentsConcurrencyTests.java b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/InstrumentsConcurrencyTests.java new file mode 100644 index 0000000000000..51285894f27ee --- /dev/null +++ b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/InstrumentsConcurrencyTests.java @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.metrics.DoubleCounterBuilder; +import io.opentelemetry.api.metrics.DoubleGaugeBuilder; +import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongCounterBuilder; +import io.opentelemetry.api.metrics.LongUpDownCounterBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongCounter; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; + +import org.elasticsearch.test.ESTestCase; + +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; + +public class InstrumentsConcurrencyTests extends ESTestCase { + String name = "name"; + String description = "desc"; + String unit = "kg"; + Meter noopMeter = OpenTelemetry.noop().getMeter("noop"); + CountDownLatch registerLatch = new CountDownLatch(1); + Meter lockingMeter = new Meter() { + @Override + public LongCounterBuilder counterBuilder(String name) { + return new LockingLongCounterBuilder(); + } + + @Override + public LongUpDownCounterBuilder upDownCounterBuilder(String name) { + return null; + } + + @Override + public DoubleHistogramBuilder histogramBuilder(String name) { + return null; + } + + @Override + public DoubleGaugeBuilder gaugeBuilder(String name) { + return null; + } + }; + + class LockingLongCounterBuilder implements LongCounterBuilder { + + @Override + public LongCounterBuilder setDescription(String description) { + return this; + } + + @Override + public LongCounterBuilder setUnit(String unit) { + return this; + } + + @Override + public DoubleCounterBuilder ofDoubles() { + return null; + } + + @Override + public LongCounter build() { + try { + registerLatch.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return null; + } + + @Override + public ObservableLongCounter buildWithCallback(Consumer callback) { + return null; + } + } + + public void testLockingWhenRegistering() throws Exception { + Instruments instruments = new Instruments(lockingMeter); + + var registerThread = new Thread(() -> instruments.registerLongCounter(name, description, unit)); + // registerThread has a countDown latch that is simulating a long-running registration + registerThread.start(); + var setProviderThread = new Thread(() -> instruments.setProvider(noopMeter)); + // a setProviderThread will attempt to override a meter, but will wait to acquireLock + setProviderThread.start(); + + // assert that a thread is waiting for a lock during long-running registration + assertBusy(() -> assertThat(setProviderThread.getState(), equalTo(Thread.State.WAITING))); + // assert that the old lockingMeter is still in place + assertBusy(() -> assertThat(instruments.getMeter(), sameInstance(lockingMeter))); + + // finish long-running registration + registerLatch.countDown(); + // assert that a meter was overriden + assertBusy(() -> assertThat(instruments.getMeter(), sameInstance(lockingMeter))); + + } +} diff --git a/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/InstrumentsTests.java b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/InstrumentsTests.java new file mode 100644 index 0000000000000..daf511fcf7042 --- /dev/null +++ b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/InstrumentsTests.java @@ -0,0 +1,77 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.metrics.Meter; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; + +public class InstrumentsTests extends ESTestCase { + Meter noopMeter = OpenTelemetry.noop().getMeter("noop"); + Meter someOtherMeter = OpenTelemetry.noop().getMeter("xyz"); + String name = "name"; + String description = "desc"; + String unit = "kg"; + + public void testRegistrationAndLookup() { + Instruments instruments = new Instruments(noopMeter); + { + var registered = instruments.registerDoubleCounter(name, description, unit); + var lookedUp = instruments.getDoubleCounter(name); + assertThat(registered, sameInstance(lookedUp)); + } + { + var registered = instruments.registerDoubleUpDownCounter(name, description, unit); + var lookedUp = instruments.getDoubleUpDownCounter(name); + assertThat(registered, sameInstance(lookedUp)); + } + { + var registered = instruments.registerDoubleGauge(name, description, unit); + var lookedUp = instruments.getDoubleGauge(name); + assertThat(registered, sameInstance(lookedUp)); + } + { + var registered = instruments.registerDoubleHistogram(name, description, unit); + var lookedUp = instruments.getDoubleHistogram(name); + assertThat(registered, sameInstance(lookedUp)); + } + { + var registered = instruments.registerLongCounter(name, description, unit); + var lookedUp = instruments.getLongCounter(name); + assertThat(registered, sameInstance(lookedUp)); + } + { + var registered = instruments.registerLongUpDownCounter(name, description, unit); + var lookedUp = instruments.getLongUpDownCounter(name); + assertThat(registered, sameInstance(lookedUp)); + } + { + var registered = instruments.registerLongGauge(name, description, unit); + var lookedUp = instruments.getLongGauge(name); + assertThat(registered, sameInstance(lookedUp)); + } + { + var registered = instruments.registerLongHistogram(name, description, unit); + var lookedUp = instruments.getLongHistogram(name); + assertThat(registered, sameInstance(lookedUp)); + } + } + + public void testNameValidation() { + Instruments instruments = new Instruments(noopMeter); + + instruments.registerLongHistogram(name, description, unit); + var e = expectThrows(IllegalStateException.class, () -> instruments.registerLongHistogram(name, description, unit)); + assertThat(e.getMessage(), equalTo("LongHistogramAdapter[name] already registered")); + } +} diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index 99ce5910c9775..1a082e7558577 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -385,6 +385,7 @@ org.elasticsearch.serverless.apifiltering; exports org.elasticsearch.telemetry.tracing; exports org.elasticsearch.telemetry; + exports org.elasticsearch.telemetry.metric; provides java.util.spi.CalendarDataProvider with org.elasticsearch.common.time.IsoCalendarDataProvider; provides org.elasticsearch.xcontent.ErrorOnUnknown with org.elasticsearch.common.xcontent.SuggestingErrorOnUnknown; diff --git a/server/src/main/java/org/elasticsearch/telemetry/TelemetryProvider.java b/server/src/main/java/org/elasticsearch/telemetry/TelemetryProvider.java index 0df8aeedac7f8..add994787227f 100644 --- a/server/src/main/java/org/elasticsearch/telemetry/TelemetryProvider.java +++ b/server/src/main/java/org/elasticsearch/telemetry/TelemetryProvider.java @@ -8,11 +8,15 @@ package org.elasticsearch.telemetry; +import org.elasticsearch.telemetry.metric.Meter; import org.elasticsearch.telemetry.tracing.Tracer; public interface TelemetryProvider { + Tracer getTracer(); + Meter getMeter(); + TelemetryProvider NOOP = new TelemetryProvider() { @Override @@ -20,5 +24,9 @@ public Tracer getTracer() { return Tracer.NOOP; } + @Override + public Meter getMeter() { + return Meter.NOOP; + } }; } diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleCounter.java b/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleCounter.java new file mode 100644 index 0000000000000..c98701bb0a1bb --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleCounter.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +import java.util.Map; + +/** + * A monotonically increasing metric that uses a double. + * Useful for capturing the number of bytes received, number of requests, etc. + */ +public interface DoubleCounter extends Instrument { + /** + * Add one to the current counter. + */ + void increment(); + + /** + * Increment the counter. + * @param inc amount to increment, non-negative + */ + void incrementBy(double inc); + + /** + * Increment the counter. + * @param inc amount to increment, non-negative + * @param attributes key-value pairs to associate with this increment + */ + void incrementBy(double inc, Map attributes); + + /** + * Noop counter for use in tests. + */ + DoubleCounter NOOP = new DoubleCounter() { + @Override + public String getName() { + return "noop"; + } + + @Override + public void increment() { + + } + + @Override + public void incrementBy(double inc) { + + } + + @Override + public void incrementBy(double inc, Map attributes) { + + } + }; +} diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleGauge.java b/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleGauge.java new file mode 100644 index 0000000000000..797c125900bb8 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleGauge.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +import java.util.Map; + +/** + * Record non-additive double values. eg number of running threads, current load + */ +public interface DoubleGauge extends Instrument { + /** + * Record the current value for measured item + */ + void record(double value); + + /** + * Record the current value + * @param attributes key-value pairs to associate with the current measurement + */ + void record(double value, Map attributes); + + /** + * Noop gauge for tests + */ + DoubleGauge NOOP = new DoubleGauge() { + @Override + public String getName() { + return "noop"; + } + + @Override + public void record(double value) { + + } + + @Override + public void record(double value, Map attributes) { + + } + }; +} diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleHistogram.java b/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleHistogram.java new file mode 100644 index 0000000000000..11958ea36cd3d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleHistogram.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +import java.util.Map; + +/** + * Record arbitrary values that are summarized statistically, useful for percentiles and histograms. + */ +public interface DoubleHistogram extends Instrument { + /** + * Record a sample for the measured item + * @param value + */ + void record(double value); + + /** + * Record a sample for the measured item + * @param attributes key-value pairs to associate with the current sample + */ + void record(double value, Map attributes); + + /** + * Noop histogram for tests + */ + DoubleHistogram NOOP = new DoubleHistogram() { + @Override + public String getName() { + return "noop"; + } + + @Override + public void record(double value) { + + } + + @Override + public void record(double value, Map attributes) { + + } + }; +} diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleUpDownCounter.java b/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleUpDownCounter.java new file mode 100644 index 0000000000000..7d484ebf07d32 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleUpDownCounter.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +import java.util.Map; + +/** + * A counter that supports decreasing and increasing values. + * Useful for capturing the number of requests in a queue. + */ +public interface DoubleUpDownCounter extends Instrument { + /** + * Add to the counter + * @param inc may be negative. + */ + void add(double inc); + + /** + * Add to the counter + * @param inc may be negative. + * @param attributes key-value pairs to associate with this increment + */ + void add(double inc, Map attributes); + + /** + * Noop counter for use in tests + */ + DoubleUpDownCounter NOOP = new DoubleUpDownCounter() { + @Override + public String getName() { + return "noop"; + } + + @Override + public void add(double inc) { + + } + + @Override + public void add(double inc, Map attributes) { + + } + }; +} diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/Instrument.java b/server/src/main/java/org/elasticsearch/telemetry/metric/Instrument.java new file mode 100644 index 0000000000000..19a7e259120f2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/Instrument.java @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +public interface Instrument { + String getName(); +} diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/LongCounter.java b/server/src/main/java/org/elasticsearch/telemetry/metric/LongCounter.java new file mode 100644 index 0000000000000..f8f2150163835 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/LongCounter.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +import java.util.Map; + +/** + * A monotonically increasing metric that uses a long. Useful for integral values such as the number of bytes received, + * number of requests, etc. + */ +public interface LongCounter extends Instrument { + /** + * Add one to the current counter + */ + void increment(); + + /** + * Increment the counter + * @param inc amount to increment + */ + void incrementBy(long inc); + + /** + * Increment the counter. + * @param inc amount to increment + * @param attributes key-value pairs to associate with this increment + */ + void incrementBy(long inc, Map attributes); + + /** + * Noop counter for use in tests. + */ + LongCounter NOOP = new LongCounter() { + @Override + public String getName() { + return "noop"; + } + + @Override + public void increment() { + + } + + @Override + public void incrementBy(long inc) { + + } + + @Override + public void incrementBy(long inc, Map attributes) { + + } + }; +} diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/LongGauge.java b/server/src/main/java/org/elasticsearch/telemetry/metric/LongGauge.java new file mode 100644 index 0000000000000..71539064ce53e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/LongGauge.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +import java.util.Map; + +/** + * Record non-additive long values. + */ +public interface LongGauge extends Instrument { + + /** + * Record the current value of the measured item. + * @param value + */ + void record(long value); + + /** + * Record the current value + * @param attributes key-value pairs to associate with the current measurement + */ + void record(long value, Map attributes); + + /** + * Noop gauge for tests + */ + LongGauge NOOP = new LongGauge() { + @Override + public String getName() { + return "noop"; + } + + @Override + public void record(long value) { + + } + + @Override + public void record(long value, Map attributes) { + + } + }; +} diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/LongHistogram.java b/server/src/main/java/org/elasticsearch/telemetry/metric/LongHistogram.java new file mode 100644 index 0000000000000..27d5261f755ef --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/LongHistogram.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +import java.util.Map; + +/** + * Record arbitrary values that are summarized statistically, useful for percentiles and histograms. + */ +public interface LongHistogram extends Instrument { + /** + * Record a sample for the measured item + * @param value + */ + void record(long value); + + /** + * Record a sample for the measured item + * @param attributes key-value pairs to associate with the current sample + */ + void record(long value, Map attributes); + + /** + * Noop histogram for tests + */ + LongHistogram NOOP = new LongHistogram() { + @Override + public String getName() { + return "noop"; + } + + @Override + public void record(long value) { + + } + + @Override + public void record(long value, Map attributes) { + + } + }; +} diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/LongUpDownCounter.java b/server/src/main/java/org/elasticsearch/telemetry/metric/LongUpDownCounter.java new file mode 100644 index 0000000000000..f62030da8f6bd --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/LongUpDownCounter.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +import java.util.Map; + +/** + * A counter that supports decreasing and increasing values. + * Useful for capturing the number of requests in a queue. + */ +public interface LongUpDownCounter extends Instrument { + /** + * Add to the counter + * @param inc may be negative. + */ + void add(long inc); + + /** + * Add to the counter + * @param inc may be negative. + * @param attributes key-value pairs to associate with this increment + */ + void add(long inc, Map attributes); + + /** + * Noop counter for use in tests + */ + LongUpDownCounter NOOP = new LongUpDownCounter() { + @Override + public String getName() { + return "noop"; + } + + @Override + public void add(long inc) { + + } + + @Override + public void add(long inc, Map attributes) { + + } + }; +} diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/Meter.java b/server/src/main/java/org/elasticsearch/telemetry/metric/Meter.java new file mode 100644 index 0000000000000..77bbf6f673fd3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/Meter.java @@ -0,0 +1,228 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +/** + * Container for metering instruments. Meters with the same name and type (DoubleCounter, etc) can + * only be registered once. + * TODO(stu): describe name, unit and description + */ +public interface Meter { + /** + * Register a {@link DoubleCounter}. The returned object may be reused. + * @param name name of the counter + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @return the registered meter. + */ + DoubleCounter registerDoubleCounter(String name, String description, String unit); + + /** + * Retrieved a previously registered {@link DoubleCounter}. + * @param name name of the counter + * @return the registered meter. + */ + DoubleCounter getDoubleCounter(String name); + + /** + * Register a {@link DoubleUpDownCounter}. The returned object may be reused. + * @param name name of the counter + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @return the registered meter. + */ + DoubleUpDownCounter registerDoubleUpDownCounter(String name, String description, String unit); + + /** + * Retrieved a previously registered {@link DoubleUpDownCounter}. + * @param name name of the counter + * @return the registered meter. + */ + DoubleUpDownCounter getDoubleUpDownCounter(String name); + + /** + * Register a {@link DoubleGauge}. The returned object may be reused. + * @param name name of the gauge + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @return the registered meter. + */ + DoubleGauge registerDoubleGauge(String name, String description, String unit); + + /** + * Retrieved a previously registered {@link DoubleGauge}. + * @param name name of the gauge + * @return the registered meter. + */ + DoubleGauge getDoubleGauge(String name); + + /** + * Register a {@link DoubleHistogram}. The returned object may be reused. + * @param name name of the histogram + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @return the registered meter. + */ + DoubleHistogram registerDoubleHistogram(String name, String description, String unit); + + /** + * Retrieved a previously registered {@link DoubleHistogram}. + * @param name name of the histogram + * @return the registered meter. + */ + DoubleHistogram getDoubleHistogram(String name); + + /** + * Register a {@link LongCounter}. The returned object may be reused. + * @param name name of the counter + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @return the registered meter. + */ + LongCounter registerLongCounter(String name, String description, String unit); + + /** + * Retrieved a previously registered {@link LongCounter}. + * @param name name of the counter + * @return the registered meter. + */ + LongCounter getLongCounter(String name); + + /** + * Register a {@link LongUpDownCounter}. The returned object may be reused. + * @param name name of the counter + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @return the registered meter. + */ + LongUpDownCounter registerLongUpDownCounter(String name, String description, String unit); + + /** + * Retrieved a previously registered {@link LongUpDownCounter}. + * @param name name of the counter + * @return the registered meter. + */ + LongUpDownCounter getLongUpDownCounter(String name); + + /** + * Register a {@link LongGauge}. The returned object may be reused. + * @param name name of the gauge + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @return the registered meter. + */ + LongGauge registerLongGauge(String name, String description, String unit); + + /** + * Retrieved a previously registered {@link LongGauge}. + * @param name name of the gauge + * @return the registered meter. + */ + LongGauge getLongGauge(String name); + + /** + * Register a {@link LongHistogram}. The returned object may be reused. + * @param name name of the histogram + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @return the registered meter. + */ + LongHistogram registerLongHistogram(String name, String description, String unit); + + /** + * Retrieved a previously registered {@link LongHistogram}. + * @param name name of the histogram + * @return the registered meter. + */ + LongHistogram getLongHistogram(String name); + + /** + * Noop implementation for tests + */ + Meter NOOP = new Meter() { + @Override + public DoubleCounter registerDoubleCounter(String name, String description, String unit) { + return DoubleCounter.NOOP; + } + + @Override + public DoubleCounter getDoubleCounter(String name) { + return DoubleCounter.NOOP; + } + + public DoubleUpDownCounter registerDoubleUpDownCounter(String name, String description, String unit) { + return DoubleUpDownCounter.NOOP; + } + + @Override + public DoubleUpDownCounter getDoubleUpDownCounter(String name) { + return DoubleUpDownCounter.NOOP; + } + + @Override + public DoubleGauge registerDoubleGauge(String name, String description, String unit) { + return DoubleGauge.NOOP; + } + + @Override + public DoubleGauge getDoubleGauge(String name) { + return DoubleGauge.NOOP; + } + + @Override + public DoubleHistogram registerDoubleHistogram(String name, String description, String unit) { + return DoubleHistogram.NOOP; + } + + @Override + public DoubleHistogram getDoubleHistogram(String name) { + return DoubleHistogram.NOOP; + } + + @Override + public LongCounter registerLongCounter(String name, String description, String unit) { + return LongCounter.NOOP; + } + + @Override + public LongCounter getLongCounter(String name) { + return LongCounter.NOOP; + } + + @Override + public LongUpDownCounter registerLongUpDownCounter(String name, String description, String unit) { + return LongUpDownCounter.NOOP; + } + + @Override + public LongUpDownCounter getLongUpDownCounter(String name) { + return LongUpDownCounter.NOOP; + } + + @Override + public LongGauge registerLongGauge(String name, String description, String unit) { + return LongGauge.NOOP; + } + + @Override + public LongGauge getLongGauge(String name) { + return LongGauge.NOOP; + } + + @Override + public LongHistogram registerLongHistogram(String name, String description, String unit) { + return LongHistogram.NOOP; + } + + @Override + public LongHistogram getLongHistogram(String name) { + return LongHistogram.NOOP; + } + }; +} From 44a2d68f197094dc3af8d053ff166e0228256a71 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 28 Sep 2023 17:45:17 -0700 Subject: [PATCH 3/3] Replace recursive with loop in PackedValuesBlockHash (#99992) This change replaces the recursion with a loop when packing and hashing multiple keys. While the recursive version is clever, it may not be as straightforward for future readers. Using a loop also helps us avoid StackOverflow when grouping by a large number of keys. --- .../blockhash/PackedValuesBlockHash.java | 192 +++++++++--------- 1 file changed, 95 insertions(+), 97 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java index 31f65e9b70053..7ecaddf2092fa 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java @@ -22,8 +22,6 @@ import org.elasticsearch.compute.operator.BatchEncoder; import org.elasticsearch.compute.operator.HashAggregationOperator; import org.elasticsearch.compute.operator.MultivalueDedupe; -import org.elasticsearch.logging.LogManager; -import org.elasticsearch.logging.Logger; import java.util.Arrays; import java.util.List; @@ -51,19 +49,20 @@ * } */ final class PackedValuesBlockHash extends BlockHash { - private static final Logger logger = LogManager.getLogger(PackedValuesBlockHash.class); static final int DEFAULT_BATCH_SIZE = Math.toIntExact(ByteSizeValue.ofKb(10).getBytes()); - private final List groups; private final int emitBatchSize; private final BytesRefHash bytesRefHash; private final int nullTrackingBytes; + private final BytesRef scratch = new BytesRef(); + private final BytesRefBuilder bytes = new BytesRefBuilder(); + private final Group[] groups; - PackedValuesBlockHash(List groups, BigArrays bigArrays, int emitBatchSize) { - this.groups = groups; + PackedValuesBlockHash(List specs, BigArrays bigArrays, int emitBatchSize) { + this.groups = specs.stream().map(Group::new).toArray(Group[]::new); this.emitBatchSize = emitBatchSize; this.bytesRefHash = new BytesRefHash(1, bigArrays); - this.nullTrackingBytes = groups.size() / 8 + 1; + this.nullTrackingBytes = (groups.length + 7) / 8; } @Override @@ -75,23 +74,28 @@ void add(Page page, GroupingAggregatorFunction.AddInput addInput, int batchSize) new AddWork(page, addInput, batchSize).add(); } + private static class Group { + final HashAggregationOperator.GroupSpec spec; + BatchEncoder encoder; + int positionOffset; + int valueOffset; + int loopedIndex; + int valueCount; + int bytesStart; + + Group(HashAggregationOperator.GroupSpec spec) { + this.spec = spec; + } + } + class AddWork extends LongLongBlockHash.AbstractAddBlock { - final BatchEncoder[] encoders = new BatchEncoder[groups.size()]; - final int[] positionOffsets = new int[groups.size()]; - final int[] valueOffsets = new int[groups.size()]; - final BytesRef[] scratches = new BytesRef[groups.size()]; - final BytesRefBuilder bytes = new BytesRefBuilder(); final int positionCount; - int position; - int count; - int bufferedGroup; AddWork(Page page, GroupingAggregatorFunction.AddInput addInput, int batchSize) { super(emitBatchSize, addInput); - for (int g = 0; g < groups.size(); g++) { - encoders[g] = MultivalueDedupe.batchEncoder(page.getBlock(groups.get(g).channel()), batchSize); - scratches[g] = new BytesRef(); + for (Group group : groups) { + group.encoder = MultivalueDedupe.batchEncoder(page.getBlock(group.spec.channel()), batchSize); } bytes.grow(nullTrackingBytes); this.positionCount = page.getPositionCount(); @@ -104,91 +108,86 @@ class AddWork extends LongLongBlockHash.AbstractAddBlock { */ void add() { for (position = 0; position < positionCount; position++) { - if (logger.isTraceEnabled()) { - logger.trace("position {}", position); - } // Make sure all encoders have encoded the current position and the offsets are queued to it's start - for (int g = 0; g < encoders.length; g++) { - positionOffsets[g]++; - while (positionOffsets[g] >= encoders[g].positionCount()) { - encoders[g].encodeNextBatch(); - positionOffsets[g] = 0; - valueOffsets[g] = 0; + boolean singleEntry = true; + for (Group g : groups) { + var encoder = g.encoder; + g.positionOffset++; + while (g.positionOffset >= encoder.positionCount()) { + encoder.encodeNextBatch(); + g.positionOffset = 0; + g.valueOffset = 0; } + g.valueCount = encoder.valueCount(g.positionOffset); + singleEntry &= (g.valueCount == 1); } - - count = 0; Arrays.fill(bytes.bytes(), 0, nullTrackingBytes, (byte) 0); bytes.setLength(nullTrackingBytes); - addPosition(0); - switch (count) { - case 0 -> throw new IllegalStateException("didn't find any values"); - case 1 -> { - ords.appendInt(bufferedGroup); - addedValue(position); - } - default -> ords.endPositionEntry(); - } - for (int g = 0; g < encoders.length; g++) { - valueOffsets[g] += encoders[g].valueCount(positionOffsets[g]); + if (singleEntry) { + addSingleEntry(); + } else { + addMultipleEntries(); } } emitOrds(); } - private void addPosition(int g) { - if (g == groups.size()) { - addBytes(); - return; - } - int start = bytes.length(); - int count = encoders[g].valueCount(positionOffsets[g]); - assert count > 0; - int valueOffset = valueOffsets[g]; - BytesRef v = encoders[g].read(valueOffset++, scratches[g]); - if (logger.isTraceEnabled()) { - logger.trace("\t".repeat(g + 1) + v); - } - if (v.length == 0) { - assert count == 1 : "null value in non-singleton list"; - int nullByte = g / 8; - int nullShift = g % 8; - bytes.bytes()[nullByte] |= (byte) (1 << nullShift); - } - bytes.setLength(start); - bytes.append(v); - addPosition(g + 1); // TODO stack overflow protection - for (int i = 1; i < count; i++) { - v = encoders[g].read(valueOffset++, scratches[g]); - if (logger.isTraceEnabled()) { - logger.trace("\t".repeat(g + 1) + v); + private void addSingleEntry() { + for (int g = 0; g < groups.length; g++) { + Group group = groups[g]; + BytesRef v = group.encoder.read(group.valueOffset++, scratch); + if (v.length == 0) { + int nullByte = g / 8; + int nullShift = g % 8; + bytes.bytes()[nullByte] |= (byte) (1 << nullShift); + } else { + bytes.append(v); } - assert v.length > 0 : "null value after the first position"; - bytes.setLength(start); - bytes.append(v); - addPosition(g + 1); } + int ord = Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.get()))); + ords.appendInt(ord); + addedValue(position); } - private void addBytes() { - int group = Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.get()))); - switch (count) { - case 0 -> bufferedGroup = group; - case 1 -> { - ords.beginPositionEntry(); - ords.appendInt(bufferedGroup); - addedValueInMultivaluePosition(position); - ords.appendInt(group); - addedValueInMultivaluePosition(position); + private void addMultipleEntries() { + ords.beginPositionEntry(); + int g = 0; + outer: for (;;) { + for (; g < groups.length; g++) { + Group group = groups[g]; + group.bytesStart = bytes.length(); + BytesRef v = group.encoder.read(group.valueOffset + group.loopedIndex, scratch); + ++group.loopedIndex; + if (v.length == 0) { + assert group.valueCount == 1 : "null value in non-singleton list"; + int nullByte = g / 8; + int nullShift = g % 8; + bytes.bytes()[nullByte] |= (byte) (1 << nullShift); + } else { + bytes.append(v); + } } - default -> { - ords.appendInt(group); - addedValueInMultivaluePosition(position); + // emit ords + int ord = Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.get()))); + ords.appendInt(ord); + addedValueInMultivaluePosition(position); + + // rewind + Group group = groups[--g]; + bytes.setLength(group.bytesStart); + while (group.loopedIndex == group.valueCount) { + group.loopedIndex = 0; + if (g == 0) { + break outer; + } else { + group = groups[--g]; + bytes.setLength(group.bytesStart); + } } } - count++; - if (logger.isTraceEnabled()) { - logger.trace("{} = {}", bytes.get(), group); + ords.endPositionEntry(); + for (Group group : groups) { + group.valueOffset += group.valueCount; } } } @@ -196,16 +195,16 @@ private void addBytes() { @Override public Block[] getKeys() { int size = Math.toIntExact(bytesRefHash.size()); - BatchEncoder.Decoder[] decoders = new BatchEncoder.Decoder[groups.size()]; - Block.Builder[] builders = new Block.Builder[groups.size()]; + BatchEncoder.Decoder[] decoders = new BatchEncoder.Decoder[groups.length]; + Block.Builder[] builders = new Block.Builder[groups.length]; for (int g = 0; g < builders.length; g++) { - ElementType elementType = groups.get(g).elementType(); + ElementType elementType = groups[g].spec.elementType(); decoders[g] = BatchEncoder.decoder(elementType); builders[g] = elementType.newBlockBuilder(size); } - BytesRef values[] = new BytesRef[(int) Math.min(100, bytesRefHash.size())]; - BytesRef nulls[] = new BytesRef[values.length]; + BytesRef[] values = new BytesRef[(int) Math.min(100, bytesRefHash.size())]; + BytesRef[] nulls = new BytesRef[values.length]; for (int offset = 0; offset < values.length; offset++) { values[offset] = new BytesRef(); nulls[offset] = new BytesRef(); @@ -231,7 +230,7 @@ public Block[] getKeys() { readKeys(decoders, builders, nulls, values, offset); } - Block[] keyBlocks = new Block[groups.size()]; + Block[] keyBlocks = new Block[groups.length]; for (int g = 0; g < keyBlocks.length; g++) { keyBlocks[g] = builders[g].build(); } @@ -271,13 +270,12 @@ public String toString() { StringBuilder b = new StringBuilder(); b.append("PackedValuesBlockHash{groups=["); boolean first = true; - for (HashAggregationOperator.GroupSpec spec : groups) { - if (first) { - first = false; - } else { + for (int i = 0; i < groups.length; i++) { + if (i > 0) { b.append(", "); } - b.append(spec.channel()).append(':').append(spec.elementType()); + Group group = groups[i]; + b.append(group.spec.channel()).append(':').append(group.spec.elementType()); } b.append("], entries=").append(bytesRefHash.size()); b.append(", size=").append(ByteSizeValue.ofBytes(bytesRefHash.ramBytesUsed()));