diff --git a/docs/changelog/99832.yaml b/docs/changelog/99832.yaml new file mode 100644 index 0000000000000..9bd83591ba920 --- /dev/null +++ b/docs/changelog/99832.yaml @@ -0,0 +1,5 @@ +pr: 99832 +summary: APM Metering API +area: Infra/Core +type: enhancement +issues: [] diff --git a/docs/reference/redirects.asciidoc b/docs/reference/redirects.asciidoc index 8e1023c47b929..4ec8c203bbef9 100644 --- a/docs/reference/redirects.asciidoc +++ b/docs/reference/redirects.asciidoc @@ -1932,3 +1932,73 @@ Refer to <>. === Configure roles and users for remote clusters Refer to <>. + +[role="exclude",id="ingest-pipeline-search"] +=== Ingest pipelines for Search indices + +coming::[8.11.0] + +[role="exclude",id="ingest-pipeline-search-inference"] +=== Inference processing for Search indices + +coming::[8.11.0] + +[id="ingest-pipeline-search-inference-update-mapping"] +==== Update mapping + +coming::[8.11.0] + +[role="exclude",id="nlp-example"] +=== Tutorial: Natural language processing (NLP) + +coming::[8.11.0] + +[role="exclude",id="behavioral-analytics-overview"] +=== Elastic Behavioral Analytics + +coming::[8.11.0] + +[role="exclude",id="behavioral-analytics-start"] +=== Get started with Behavioral Analytics + +coming::[8.11.0] + +[role="exclude",id="behavioral-analytics-api"] +=== Behavioral Analytics APIs + +coming::[8.11.0] + +[role="exclude",id="behavioral-analytics-event"] +=== View Behavioral Analytics Events + +coming::[8.11.0] + +[role="exclude",id="behavioral-analytics-event-reference"] +=== Behavioral Analytics events reference + +coming::[8.11.0] + +[role="exclude",id="behavioral-analytics-cors"] +=== Set up CORS for Behavioral Analytics + +coming::[8.11.0] + +[role="exclude",id="search-application-overview"] +=== Elastic Search Applications + +coming::[8.11.0] + +[role="exclude",id="search-application-api"] +=== Search Applications search API and templates + +coming::[8.11.0] + +[role="exclude",id="search-application-client"] +=== Search Applications client + +coming::[8.11.0] + +[role="exclude",id="search-application-security"] +=== Search Applications security + +coming::[8.11.0] diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APM.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APM.java index be59eda4a63c2..935c4958ba3d7 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APM.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APM.java @@ -27,6 +27,7 @@ import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.telemetry.apm.internal.APMAgentSettings; import org.elasticsearch.telemetry.apm.internal.APMTelemetryProvider; +import org.elasticsearch.telemetry.apm.internal.metrics.APMMeter; import org.elasticsearch.telemetry.apm.internal.tracing.APMTracer; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; @@ -97,13 +98,16 @@ public Collection createComponents( apmAgentSettings.syncAgentSystemProperties(settings); apmAgentSettings.addClusterSettingsListeners(clusterService, telemetryProvider.get()); - return List.of(apmTracer); + final APMMeter apmMeter = telemetryProvider.get().getMeter(); + + return List.of(apmTracer, apmMeter); } @Override public List> getSettings() { return List.of( APMAgentSettings.APM_ENABLED_SETTING, + APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING, APMAgentSettings.APM_TRACING_NAMES_INCLUDE_SETTING, APMAgentSettings.APM_TRACING_NAMES_EXCLUDE_SETTING, APMAgentSettings.APM_TRACING_SANITIZE_FIELD_NAMES, diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMAgentSettings.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMAgentSettings.java index 75ca94bb13ad6..e4a194ebe0172 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMAgentSettings.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMAgentSettings.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.telemetry.apm.internal.metrics.APMMeter; import org.elasticsearch.telemetry.apm.internal.tracing.APMTracer; import java.security.AccessController; @@ -40,14 +41,24 @@ public class APMAgentSettings { * Sensible defaults that Elasticsearch configures. This cannot be done via the APM agent * config file, as then their values could not be overridden dynamically via system properties. */ - static Map APM_AGENT_DEFAULT_SETTINGS = Map.of("transaction_sample_rate", "0.2"); + static Map APM_AGENT_DEFAULT_SETTINGS = Map.of( + "transaction_sample_rate", + "0.2", + "enable_experimental_instrumentations", + "true" + ); public void addClusterSettingsListeners(ClusterService clusterService, APMTelemetryProvider apmTelemetryProvider) { final ClusterSettings clusterSettings = clusterService.getClusterSettings(); final APMTracer apmTracer = apmTelemetryProvider.getTracer(); + final APMMeter apmMeter = apmTelemetryProvider.getMeter(); clusterSettings.addSettingsUpdateConsumer(APM_ENABLED_SETTING, enabled -> { apmTracer.setEnabled(enabled); + this.setAgentSetting("instrument", Boolean.toString(enabled)); + }); + clusterSettings.addSettingsUpdateConsumer(TELEMETRY_METRICS_ENABLED_SETTING, enabled -> { + apmMeter.setEnabled(enabled); // The agent records data other than spans, e.g. JVM metrics, so we toggle this setting in order to // minimise its impact to a running Elasticsearch. this.setAgentSetting("recording", Boolean.toString(enabled)); @@ -106,8 +117,10 @@ public void setAgentSetting(String key, String value) { private static final List PROHIBITED_AGENT_KEYS = List.of( // ES generates a config file and sets this value "config_file", - // ES controls this via `tracing.apm.enabled` - "recording" + // ES controls this via `telemetry.metrics.enabled` + "recording", + // ES controls this via `apm.enabled` + "instrument" ); public static final Setting.AffixSetting APM_AGENT_SETTINGS = Setting.prefixKeySetting( @@ -164,6 +177,13 @@ public void setAgentSetting(String key, String value) { NodeScope ); + public static final Setting TELEMETRY_METRICS_ENABLED_SETTING = Setting.boolSetting( + "telemetry.metrics.enabled", + false, + OperatorDynamic, + NodeScope + ); + public static final Setting APM_SECRET_TOKEN_SETTING = SecureSetting.secureString( APM_SETTING_PREFIX + "secret_token", null diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMTelemetryProvider.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMTelemetryProvider.java index 495afd43bf176..ae9d91cc6ec51 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMTelemetryProvider.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMTelemetryProvider.java @@ -10,19 +10,27 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.telemetry.TelemetryProvider; +import org.elasticsearch.telemetry.apm.internal.metrics.APMMeter; import org.elasticsearch.telemetry.apm.internal.tracing.APMTracer; public class APMTelemetryProvider implements TelemetryProvider { private final Settings settings; private final APMTracer apmTracer; + private final APMMeter apmMeter; public APMTelemetryProvider(Settings settings) { this.settings = settings; apmTracer = new APMTracer(settings); + apmMeter = new APMMeter(settings); } @Override public APMTracer getTracer() { return apmTracer; } + + @Override + public APMMeter getMeter() { + return apmMeter; + } } diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/APMMeter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/APMMeter.java new file mode 100644 index 0000000000000..0a8d425579ca2 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/APMMeter.java @@ -0,0 +1,180 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.metrics.Meter; + +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.telemetry.apm.internal.APMTelemetryProvider; +import org.elasticsearch.telemetry.metric.DoubleCounter; +import org.elasticsearch.telemetry.metric.DoubleGauge; +import org.elasticsearch.telemetry.metric.DoubleHistogram; +import org.elasticsearch.telemetry.metric.DoubleUpDownCounter; +import org.elasticsearch.telemetry.metric.LongCounter; +import org.elasticsearch.telemetry.metric.LongGauge; +import org.elasticsearch.telemetry.metric.LongHistogram; +import org.elasticsearch.telemetry.metric.LongUpDownCounter; + +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.function.Supplier; + +import static org.elasticsearch.telemetry.apm.internal.APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING; + +public class APMMeter extends AbstractLifecycleComponent implements org.elasticsearch.telemetry.metric.Meter { + private final Instruments instruments; + + private final Supplier otelMeterSupplier; + private final Supplier noopMeterSupplier; + + private volatile boolean enabled; + + public APMMeter(Settings settings) { + this(settings, APMMeter.otelMeter(), APMMeter.noopMeter()); + } + + public APMMeter(Settings settings, Supplier otelMeterSupplier, Supplier noopMeterSupplier) { + this.enabled = TELEMETRY_METRICS_ENABLED_SETTING.get(settings); + this.otelMeterSupplier = otelMeterSupplier; + this.noopMeterSupplier = noopMeterSupplier; + this.instruments = new Instruments(enabled ? createOtelMeter() : createNoopMeter()); + } + + /** + * @see org.elasticsearch.telemetry.apm.internal.APMAgentSettings#addClusterSettingsListeners(ClusterService, APMTelemetryProvider) + */ + public void setEnabled(boolean enabled) { + this.enabled = enabled; + if (enabled) { + instruments.setProvider(createOtelMeter()); + } else { + instruments.setProvider(createNoopMeter()); + } + } + + @Override + protected void doStart() {} + + @Override + protected void doStop() { + instruments.setProvider(createNoopMeter()); + } + + @Override + protected void doClose() {} + + @Override + public DoubleCounter registerDoubleCounter(String name, String description, String unit) { + return instruments.registerDoubleCounter(name, description, unit); + } + + @Override + public DoubleCounter getDoubleCounter(String name) { + return instruments.getDoubleCounter(name); + } + + @Override + public DoubleUpDownCounter registerDoubleUpDownCounter(String name, String description, String unit) { + return instruments.registerDoubleUpDownCounter(name, description, unit); + } + + @Override + public DoubleUpDownCounter getDoubleUpDownCounter(String name) { + return instruments.getDoubleUpDownCounter(name); + } + + @Override + public DoubleGauge registerDoubleGauge(String name, String description, String unit) { + return instruments.registerDoubleGauge(name, description, unit); + } + + @Override + public DoubleGauge getDoubleGauge(String name) { + return instruments.getDoubleGauge(name); + } + + @Override + public DoubleHistogram registerDoubleHistogram(String name, String description, String unit) { + return instruments.registerDoubleHistogram(name, description, unit); + } + + @Override + public DoubleHistogram getDoubleHistogram(String name) { + return instruments.getDoubleHistogram(name); + } + + @Override + public LongCounter registerLongCounter(String name, String description, String unit) { + return instruments.registerLongCounter(name, description, unit); + } + + @Override + public LongCounter getLongCounter(String name) { + return instruments.getLongCounter(name); + } + + @Override + public LongUpDownCounter registerLongUpDownCounter(String name, String description, String unit) { + return instruments.registerLongUpDownCounter(name, description, unit); + } + + @Override + public LongUpDownCounter getLongUpDownCounter(String name) { + return instruments.getLongUpDownCounter(name); + } + + @Override + public LongGauge registerLongGauge(String name, String description, String unit) { + return instruments.registerLongGauge(name, description, unit); + } + + @Override + public LongGauge getLongGauge(String name) { + return instruments.getLongGauge(name); + } + + @Override + public LongHistogram registerLongHistogram(String name, String description, String unit) { + return instruments.registerLongHistogram(name, description, unit); + } + + @Override + public LongHistogram getLongHistogram(String name) { + return instruments.getLongHistogram(name); + } + + Meter createOtelMeter() { + assert this.enabled; + return AccessController.doPrivileged((PrivilegedAction) otelMeterSupplier::get); + } + + private Meter createNoopMeter() { + return noopMeterSupplier.get(); + } + + private static Supplier noopMeter() { + return () -> OpenTelemetry.noop().getMeter("noop"); + } + + // to be used within doPrivileged block + private static Supplier otelMeter() { + var openTelemetry = GlobalOpenTelemetry.get(); + var meter = openTelemetry.getMeter("elasticsearch"); + return () -> meter; + } + + // scope for testing + Instruments getInstruments() { + return instruments; + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/AbstractInstrument.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/AbstractInstrument.java new file mode 100644 index 0000000000000..d3d485f52bc49 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/AbstractInstrument.java @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import org.elasticsearch.core.Nullable; +import org.elasticsearch.telemetry.metric.Instrument; + +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +/** + * An instrument that contains the name, description and unit. The delegate may be replaced when + * the provider is updated. + * Subclasses should implement the builder, which is used on initialization and provider updates. + * @param delegated instrument + */ +public abstract class AbstractInstrument implements Instrument { + private final AtomicReference delegate; + private final String name; + private final String description; + private final String unit; + + public AbstractInstrument(Meter meter, String name, String description, String unit) { + this.name = Objects.requireNonNull(name); + this.description = Objects.requireNonNull(description); + this.unit = Objects.requireNonNull(unit); + this.delegate = new AtomicReference<>(doBuildInstrument(meter)); + } + + private T doBuildInstrument(Meter meter) { + return AccessController.doPrivileged((PrivilegedAction) () -> buildInstrument(meter)); + } + + @Override + public String getName() { + return name; + } + + public String getUnit() { + return unit.toString(); + } + + T getInstrument() { + return delegate.get(); + } + + String getDescription() { + return description; + } + + void setProvider(@Nullable Meter meter) { + delegate.set(doBuildInstrument(Objects.requireNonNull(meter))); + } + + abstract T buildInstrument(Meter meter); +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleCounterAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleCounterAdapter.java new file mode 100644 index 0000000000000..b25ffdff5481b --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleCounterAdapter.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import java.util.Map; +import java.util.Objects; + +/** + * DoubleGaugeAdapter wraps an otel ObservableDoubleMeasurement + */ +public class DoubleCounterAdapter extends AbstractInstrument + implements + org.elasticsearch.telemetry.metric.DoubleCounter { + + public DoubleCounterAdapter(Meter meter, String name, String description, String unit) { + super(meter, name, description, unit); + } + + io.opentelemetry.api.metrics.DoubleCounter buildInstrument(Meter meter) { + return Objects.requireNonNull(meter) + .counterBuilder(getName()) + .ofDoubles() + .setDescription(getDescription()) + .setUnit(getUnit()) + .build(); + } + + @Override + public void increment() { + getInstrument().add(1d); + } + + @Override + public void incrementBy(double inc) { + assert inc >= 0; + getInstrument().add(inc); + } + + @Override + public void incrementBy(double inc, Map attributes) { + assert inc >= 0; + getInstrument().add(inc, OtelHelper.fromMap(attributes)); + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java new file mode 100644 index 0000000000000..9d55d475d4a93 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import java.util.Map; +import java.util.Objects; + +/** + * DoubleGaugeAdapter wraps an otel ObservableDoubleMeasurement + */ +public class DoubleGaugeAdapter extends AbstractInstrument + implements + org.elasticsearch.telemetry.metric.DoubleGauge { + + public DoubleGaugeAdapter(Meter meter, String name, String description, String unit) { + super(meter, name, description, unit); + } + + @Override + io.opentelemetry.api.metrics.ObservableDoubleMeasurement buildInstrument(Meter meter) { + var builder = Objects.requireNonNull(meter).gaugeBuilder(getName()); + return builder.setDescription(getDescription()).setUnit(getUnit()).buildObserver(); + } + + @Override + public void record(double value) { + getInstrument().record(value); + } + + @Override + public void record(double value, Map attributes) { + getInstrument().record(value, OtelHelper.fromMap(attributes)); + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleHistogramAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleHistogramAdapter.java new file mode 100644 index 0000000000000..5fd1a8a189b0f --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleHistogramAdapter.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import java.util.Map; +import java.util.Objects; + +/** + * DoubleHistogramAdapter wraps an otel DoubleHistogram + */ +public class DoubleHistogramAdapter extends AbstractInstrument + implements + org.elasticsearch.telemetry.metric.DoubleHistogram { + + public DoubleHistogramAdapter(Meter meter, String name, String description, String unit) { + super(meter, name, description, unit); + } + + @Override + io.opentelemetry.api.metrics.DoubleHistogram buildInstrument(Meter meter) { + var builder = Objects.requireNonNull(meter).histogramBuilder(getName()); + return builder.setDescription(getDescription()).setUnit(getUnit()).build(); + } + + @Override + public void record(double value) { + getInstrument().record(value); + } + + @Override + public void record(double value, Map attributes) { + getInstrument().record(value, OtelHelper.fromMap(attributes)); + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleUpDownCounterAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleUpDownCounterAdapter.java new file mode 100644 index 0000000000000..9a2fc1b564766 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleUpDownCounterAdapter.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import java.util.Map; +import java.util.Objects; + +/** + * DoubleUpDownCounterAdapter wraps an otel DoubleUpDownCounter + */ +public class DoubleUpDownCounterAdapter extends AbstractInstrument + implements + org.elasticsearch.telemetry.metric.DoubleUpDownCounter { + + public DoubleUpDownCounterAdapter(Meter meter, String name, String description, String unit) { + super(meter, name, description, unit); + } + + @Override + io.opentelemetry.api.metrics.DoubleUpDownCounter buildInstrument(Meter meter) { + return Objects.requireNonNull(meter) + .upDownCounterBuilder(getName()) + .ofDoubles() + .setDescription(getDescription()) + .setUnit(getUnit()) + .build(); + } + + @Override + public void add(double inc) { + getInstrument().add(inc); + } + + @Override + public void add(double inc, Map attributes) { + getInstrument().add(inc, OtelHelper.fromMap(attributes)); + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/Instruments.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/Instruments.java new file mode 100644 index 0000000000000..92d7d692f0ea5 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/Instruments.java @@ -0,0 +1,184 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.telemetry.metric.DoubleCounter; +import org.elasticsearch.telemetry.metric.DoubleGauge; +import org.elasticsearch.telemetry.metric.DoubleHistogram; +import org.elasticsearch.telemetry.metric.DoubleUpDownCounter; +import org.elasticsearch.telemetry.metric.LongCounter; +import org.elasticsearch.telemetry.metric.LongGauge; +import org.elasticsearch.telemetry.metric.LongHistogram; +import org.elasticsearch.telemetry.metric.LongUpDownCounter; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Container for registering and fetching instruments by type and name. + * Instrument names must be unique for a given type on registration. + * {@link #setProvider(Meter)} is used to change the provider for all existing instruments. + */ +public class Instruments { + private final Registrar doubleCounters = new Registrar<>(); + private final Registrar doubleUpDownCounters = new Registrar<>(); + private final Registrar doubleGauges = new Registrar<>(); + private final Registrar doubleHistograms = new Registrar<>(); + private final Registrar longCounters = new Registrar<>(); + private final Registrar longUpDownCounters = new Registrar<>(); + private final Registrar longGauges = new Registrar<>(); + private final Registrar longHistograms = new Registrar<>(); + + private final Meter meter; + + public Instruments(Meter meter) { + this.meter = meter; + } + + private final List> registrars = List.of( + doubleCounters, + doubleUpDownCounters, + doubleGauges, + doubleHistograms, + longCounters, + longUpDownCounters, + longGauges, + longHistograms + ); + + // Access to registration has to be restricted when the provider is updated in ::setProvider + protected final ReleasableLock registerLock = new ReleasableLock(new ReentrantLock()); + + public DoubleCounter registerDoubleCounter(String name, String description, String unit) { + try (ReleasableLock lock = registerLock.acquire()) { + return doubleCounters.register(new DoubleCounterAdapter(meter, name, description, unit)); + } + } + + public DoubleCounter getDoubleCounter(String name) { + return doubleCounters.get(name); + } + + public DoubleUpDownCounter registerDoubleUpDownCounter(String name, String description, String unit) { + try (ReleasableLock lock = registerLock.acquire()) { + return doubleUpDownCounters.register(new DoubleUpDownCounterAdapter(meter, name, description, unit)); + } + } + + public DoubleUpDownCounter getDoubleUpDownCounter(String name) { + return doubleUpDownCounters.get(name); + } + + public DoubleGauge registerDoubleGauge(String name, String description, String unit) { + try (ReleasableLock lock = registerLock.acquire()) { + return doubleGauges.register(new DoubleGaugeAdapter(meter, name, description, unit)); + } + } + + public DoubleGauge getDoubleGauge(String name) { + return doubleGauges.get(name); + } + + public DoubleHistogram registerDoubleHistogram(String name, String description, String unit) { + try (ReleasableLock lock = registerLock.acquire()) { + return doubleHistograms.register(new DoubleHistogramAdapter(meter, name, description, unit)); + } + } + + public DoubleHistogram getDoubleHistogram(String name) { + return doubleHistograms.get(name); + } + + public LongCounter registerLongCounter(String name, String description, String unit) { + try (ReleasableLock lock = registerLock.acquire()) { + return longCounters.register(new LongCounterAdapter(meter, name, description, unit)); + } + } + + public LongCounter getLongCounter(String name) { + return longCounters.get(name); + } + + public LongUpDownCounter registerLongUpDownCounter(String name, String description, String unit) { + try (ReleasableLock lock = registerLock.acquire()) { + return longUpDownCounters.register(new LongUpDownCounterAdapter(meter, name, description, unit)); + } + } + + public LongUpDownCounter getLongUpDownCounter(String name) { + return longUpDownCounters.get(name); + } + + public LongGauge registerLongGauge(String name, String description, String unit) { + try (ReleasableLock lock = registerLock.acquire()) { + return longGauges.register(new LongGaugeAdapter(meter, name, description, unit)); + } + } + + public LongGauge getLongGauge(String name) { + return longGauges.get(name); + } + + public LongHistogram registerLongHistogram(String name, String description, String unit) { + try (ReleasableLock lock = registerLock.acquire()) { + return longHistograms.register(new LongHistogramAdapter(meter, name, description, unit)); + } + } + + public LongHistogram getLongHistogram(String name) { + return longHistograms.get(name); + } + + public void setProvider(Meter meter) { + try (ReleasableLock lock = registerLock.acquire()) { + for (Registrar registrar : registrars) { + registrar.setProvider(meter); + } + } + } + + /** + * A typed wrapper for a instrument that + * @param + */ + private static class Registrar> { + private final Map registered = ConcurrentCollections.newConcurrentMap(); + + T register(T instrument) { + registered.compute(instrument.getName(), (k, v) -> { + if (v != null) { + throw new IllegalStateException( + instrument.getClass().getSimpleName() + "[" + instrument.getName() + "] already registered" + ); + } + + return instrument; + }); + return instrument; + } + + T get(String name) { + return registered.get(name); + } + + void setProvider(Meter meter) { + registered.forEach((k, v) -> v.setProvider(meter)); + } + } + + // scope for testing + Meter getMeter() { + return meter; + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongCounterAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongCounterAdapter.java new file mode 100644 index 0000000000000..122d16d9e1aa4 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongCounterAdapter.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import java.util.Map; +import java.util.Objects; + +/** + * LongCounterAdapter wraps an otel LongCounter + */ +public class LongCounterAdapter extends AbstractInstrument + implements + org.elasticsearch.telemetry.metric.LongCounter { + + public LongCounterAdapter(Meter meter, String name, String description, String unit) { + super(meter, name, description, unit); + } + + @Override + io.opentelemetry.api.metrics.LongCounter buildInstrument(Meter meter) { + var builder = Objects.requireNonNull(meter).counterBuilder(getName()); + return builder.setDescription(getDescription()).setUnit(getUnit()).build(); + } + + @Override + public void increment() { + getInstrument().add(1L); + } + + @Override + public void incrementBy(long inc) { + assert inc >= 0; + getInstrument().add(inc); + } + + @Override + public void incrementBy(long inc, Map attributes) { + assert inc >= 0; + getInstrument().add(inc, OtelHelper.fromMap(attributes)); + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java new file mode 100644 index 0000000000000..48430285a5173 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import java.util.Map; +import java.util.Objects; + +/** + * LongGaugeAdapter wraps an otel ObservableLongMeasurement + */ +public class LongGaugeAdapter extends AbstractInstrument + implements + org.elasticsearch.telemetry.metric.LongGauge { + + public LongGaugeAdapter(Meter meter, String name, String description, String unit) { + super(meter, name, description, unit); + } + + @Override + io.opentelemetry.api.metrics.ObservableLongMeasurement buildInstrument(Meter meter) { + return Objects.requireNonNull(meter) + .gaugeBuilder(getName()) + .ofLongs() + .setDescription(getDescription()) + .setUnit(getUnit()) + .buildObserver(); + } + + @Override + public void record(long value) { + getInstrument().record(value); + } + + @Override + public void record(long value, Map attributes) { + getInstrument().record(value, OtelHelper.fromMap(attributes)); + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongHistogramAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongHistogramAdapter.java new file mode 100644 index 0000000000000..bb5be4866e7b7 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongHistogramAdapter.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import java.util.Map; +import java.util.Objects; + +/** + * LongHistogramAdapter wraps an otel LongHistogram + */ +public class LongHistogramAdapter extends AbstractInstrument + implements + org.elasticsearch.telemetry.metric.LongHistogram { + + public LongHistogramAdapter(Meter meter, String name, String description, String unit) { + super(meter, name, description, unit); + } + + @Override + io.opentelemetry.api.metrics.LongHistogram buildInstrument(Meter meter) { + return Objects.requireNonNull(meter) + .histogramBuilder(getName()) + .ofLongs() + .setDescription(getDescription()) + .setUnit(getUnit()) + .build(); + } + + @Override + public void record(long value) { + getInstrument().record(value); + } + + @Override + public void record(long value, Map attributes) { + getInstrument().record(value, OtelHelper.fromMap(attributes)); + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongUpDownCounterAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongUpDownCounterAdapter.java new file mode 100644 index 0000000000000..e5af85e4ed192 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongUpDownCounterAdapter.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.metrics.Meter; + +import java.util.Map; +import java.util.Objects; + +/** + * LongUpDownCounterAdapter wraps an otel LongUpDownCounter + */ +public class LongUpDownCounterAdapter extends AbstractInstrument + implements + org.elasticsearch.telemetry.metric.LongUpDownCounter { + + public LongUpDownCounterAdapter(Meter meter, String name, String description, String unit) { + super(meter, name, description, unit); + } + + @Override + io.opentelemetry.api.metrics.LongUpDownCounter buildInstrument(Meter meter) { + var builder = Objects.requireNonNull(meter).upDownCounterBuilder(getName()); + return builder.setDescription(getDescription()).setUnit(getUnit()).build(); + } + + @Override + public void add(long inc) { + getInstrument().add(inc); + } + + @Override + public void add(long inc, Map attributes) { + getInstrument().add(inc, OtelHelper.fromMap(attributes)); + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/OtelHelper.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/OtelHelper.java new file mode 100644 index 0000000000000..673025a1a41f4 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/OtelHelper.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.common.Attributes; + +import java.util.Map; + +class OtelHelper { + static Attributes fromMap(Map attributes) { + if (attributes == null || attributes.isEmpty()) { + return Attributes.empty(); + } + var builder = Attributes.builder(); + attributes.forEach((k, v) -> { + if (v instanceof String value) { + builder.put(k, value); + } else if (v instanceof Long value) { + builder.put(k, value); + } else if (v instanceof Double value) { + builder.put(k, value); + } else if (v instanceof Boolean value) { + builder.put(k, value); + } else { + throw new IllegalArgumentException("attributes do not support value type of [" + v.getClass().getCanonicalName() + "]"); + } + }); + return builder.build(); + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java index daedb90047975..9f9fdb3dc26ef 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java @@ -150,7 +150,6 @@ APMServices createApmServices() { return AccessController.doPrivileged((PrivilegedAction) () -> { var openTelemetry = GlobalOpenTelemetry.get(); var tracer = openTelemetry.getTracer("elasticsearch", Version.CURRENT.toString()); - return new APMServices(tracer, openTelemetry); }); } @@ -452,4 +451,5 @@ private static Automaton patternsToAutomaton(List patterns) { } return Operations.union(automata); } + } diff --git a/modules/apm/src/main/plugin-metadata/plugin-security.policy b/modules/apm/src/main/plugin-metadata/plugin-security.policy index b85d3ec05c277..57da3a2efd301 100644 --- a/modules/apm/src/main/plugin-metadata/plugin-security.policy +++ b/modules/apm/src/main/plugin-metadata/plugin-security.policy @@ -11,6 +11,8 @@ grant { permission java.lang.RuntimePermission "createClassLoader"; permission java.lang.RuntimePermission "getClassLoader"; permission java.util.PropertyPermission "elastic.apm.*", "write"; + permission java.util.PropertyPermission "*", "read,write"; + permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; }; grant codeBase "${codebase.elastic-apm-agent}" { diff --git a/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/APMMeterTests.java b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/APMMeterTests.java new file mode 100644 index 0000000000000..1064b8820b089 --- /dev/null +++ b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/APMMeterTests.java @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.metrics.Meter; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.telemetry.apm.internal.APMAgentSettings; +import org.elasticsearch.telemetry.metric.DoubleCounter; +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.sameInstance; + +public class APMMeterTests extends ESTestCase { + Meter testOtel = OpenTelemetry.noop().getMeter("test"); + + Meter noopOtel = OpenTelemetry.noop().getMeter("noop"); + + public void testMeterIsSetUponConstruction() { + // test default + APMMeter apmMeter = new APMMeter(Settings.EMPTY, () -> testOtel, () -> noopOtel); + + Meter meter = apmMeter.getInstruments().getMeter(); + assertThat(meter, sameInstance(noopOtel)); + + // test explicitly enabled + var settings = Settings.builder().put(APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING.getKey(), true).build(); + apmMeter = new APMMeter(settings, () -> testOtel, () -> noopOtel); + + meter = apmMeter.getInstruments().getMeter(); + assertThat(meter, sameInstance(testOtel)); + + // test explicitly disabled + settings = Settings.builder().put(APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING.getKey(), true).build(); + apmMeter = new APMMeter(settings, () -> testOtel, () -> noopOtel); + + meter = apmMeter.getInstruments().getMeter(); + assertThat(meter, sameInstance(noopOtel)); + } + + public void testMeterIsOverridden() { + APMMeter apmMeter = new APMMeter(Settings.EMPTY, () -> testOtel, () -> noopOtel); + + Meter meter = apmMeter.getInstruments().getMeter(); + assertThat(meter, sameInstance(noopOtel)); + + apmMeter.setEnabled(true); + + meter = apmMeter.getInstruments().getMeter(); + assertThat(meter, sameInstance(testOtel)); + } + + public void testLookupByName() { + var settings = Settings.builder().put(APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING.getKey(), true).build(); + + var apmMeter = new APMMeter(settings, () -> testOtel, () -> noopOtel); + + DoubleCounter registeredCounter = apmMeter.registerDoubleCounter("name", "desc", "unit"); + DoubleCounter lookedUpCounter = apmMeter.getDoubleCounter("name"); + + assertThat(lookedUpCounter, sameInstance(registeredCounter)); + } + + public void testNoopIsSetOnStop() { + var settings = Settings.builder().put(APMAgentSettings.TELEMETRY_METRICS_ENABLED_SETTING.getKey(), true).build(); + APMMeter apmMeter = new APMMeter(settings, () -> testOtel, () -> noopOtel); + apmMeter.start(); + + Meter meter = apmMeter.getInstruments().getMeter(); + assertThat(meter, sameInstance(testOtel)); + + apmMeter.stop(); + + meter = apmMeter.getInstruments().getMeter(); + assertThat(meter, sameInstance(noopOtel)); + } + +} diff --git a/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/InstrumentsConcurrencyTests.java b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/InstrumentsConcurrencyTests.java new file mode 100644 index 0000000000000..51285894f27ee --- /dev/null +++ b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/InstrumentsConcurrencyTests.java @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.metrics.DoubleCounterBuilder; +import io.opentelemetry.api.metrics.DoubleGaugeBuilder; +import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongCounterBuilder; +import io.opentelemetry.api.metrics.LongUpDownCounterBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongCounter; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; + +import org.elasticsearch.test.ESTestCase; + +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; + +public class InstrumentsConcurrencyTests extends ESTestCase { + String name = "name"; + String description = "desc"; + String unit = "kg"; + Meter noopMeter = OpenTelemetry.noop().getMeter("noop"); + CountDownLatch registerLatch = new CountDownLatch(1); + Meter lockingMeter = new Meter() { + @Override + public LongCounterBuilder counterBuilder(String name) { + return new LockingLongCounterBuilder(); + } + + @Override + public LongUpDownCounterBuilder upDownCounterBuilder(String name) { + return null; + } + + @Override + public DoubleHistogramBuilder histogramBuilder(String name) { + return null; + } + + @Override + public DoubleGaugeBuilder gaugeBuilder(String name) { + return null; + } + }; + + class LockingLongCounterBuilder implements LongCounterBuilder { + + @Override + public LongCounterBuilder setDescription(String description) { + return this; + } + + @Override + public LongCounterBuilder setUnit(String unit) { + return this; + } + + @Override + public DoubleCounterBuilder ofDoubles() { + return null; + } + + @Override + public LongCounter build() { + try { + registerLatch.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return null; + } + + @Override + public ObservableLongCounter buildWithCallback(Consumer callback) { + return null; + } + } + + public void testLockingWhenRegistering() throws Exception { + Instruments instruments = new Instruments(lockingMeter); + + var registerThread = new Thread(() -> instruments.registerLongCounter(name, description, unit)); + // registerThread has a countDown latch that is simulating a long-running registration + registerThread.start(); + var setProviderThread = new Thread(() -> instruments.setProvider(noopMeter)); + // a setProviderThread will attempt to override a meter, but will wait to acquireLock + setProviderThread.start(); + + // assert that a thread is waiting for a lock during long-running registration + assertBusy(() -> assertThat(setProviderThread.getState(), equalTo(Thread.State.WAITING))); + // assert that the old lockingMeter is still in place + assertBusy(() -> assertThat(instruments.getMeter(), sameInstance(lockingMeter))); + + // finish long-running registration + registerLatch.countDown(); + // assert that a meter was overriden + assertBusy(() -> assertThat(instruments.getMeter(), sameInstance(lockingMeter))); + + } +} diff --git a/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/InstrumentsTests.java b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/InstrumentsTests.java new file mode 100644 index 0000000000000..daf511fcf7042 --- /dev/null +++ b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/InstrumentsTests.java @@ -0,0 +1,77 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.metrics.Meter; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; + +public class InstrumentsTests extends ESTestCase { + Meter noopMeter = OpenTelemetry.noop().getMeter("noop"); + Meter someOtherMeter = OpenTelemetry.noop().getMeter("xyz"); + String name = "name"; + String description = "desc"; + String unit = "kg"; + + public void testRegistrationAndLookup() { + Instruments instruments = new Instruments(noopMeter); + { + var registered = instruments.registerDoubleCounter(name, description, unit); + var lookedUp = instruments.getDoubleCounter(name); + assertThat(registered, sameInstance(lookedUp)); + } + { + var registered = instruments.registerDoubleUpDownCounter(name, description, unit); + var lookedUp = instruments.getDoubleUpDownCounter(name); + assertThat(registered, sameInstance(lookedUp)); + } + { + var registered = instruments.registerDoubleGauge(name, description, unit); + var lookedUp = instruments.getDoubleGauge(name); + assertThat(registered, sameInstance(lookedUp)); + } + { + var registered = instruments.registerDoubleHistogram(name, description, unit); + var lookedUp = instruments.getDoubleHistogram(name); + assertThat(registered, sameInstance(lookedUp)); + } + { + var registered = instruments.registerLongCounter(name, description, unit); + var lookedUp = instruments.getLongCounter(name); + assertThat(registered, sameInstance(lookedUp)); + } + { + var registered = instruments.registerLongUpDownCounter(name, description, unit); + var lookedUp = instruments.getLongUpDownCounter(name); + assertThat(registered, sameInstance(lookedUp)); + } + { + var registered = instruments.registerLongGauge(name, description, unit); + var lookedUp = instruments.getLongGauge(name); + assertThat(registered, sameInstance(lookedUp)); + } + { + var registered = instruments.registerLongHistogram(name, description, unit); + var lookedUp = instruments.getLongHistogram(name); + assertThat(registered, sameInstance(lookedUp)); + } + } + + public void testNameValidation() { + Instruments instruments = new Instruments(noopMeter); + + instruments.registerLongHistogram(name, description, unit); + var e = expectThrows(IllegalStateException.class, () -> instruments.registerLongHistogram(name, description, unit)); + assertThat(e.getMessage(), equalTo("LongHistogramAdapter[name] already registered")); + } +} diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index 99ce5910c9775..1a082e7558577 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -385,6 +385,7 @@ org.elasticsearch.serverless.apifiltering; exports org.elasticsearch.telemetry.tracing; exports org.elasticsearch.telemetry; + exports org.elasticsearch.telemetry.metric; provides java.util.spi.CalendarDataProvider with org.elasticsearch.common.time.IsoCalendarDataProvider; provides org.elasticsearch.xcontent.ErrorOnUnknown with org.elasticsearch.common.xcontent.SuggestingErrorOnUnknown; diff --git a/server/src/main/java/org/elasticsearch/telemetry/TelemetryProvider.java b/server/src/main/java/org/elasticsearch/telemetry/TelemetryProvider.java index 0df8aeedac7f8..add994787227f 100644 --- a/server/src/main/java/org/elasticsearch/telemetry/TelemetryProvider.java +++ b/server/src/main/java/org/elasticsearch/telemetry/TelemetryProvider.java @@ -8,11 +8,15 @@ package org.elasticsearch.telemetry; +import org.elasticsearch.telemetry.metric.Meter; import org.elasticsearch.telemetry.tracing.Tracer; public interface TelemetryProvider { + Tracer getTracer(); + Meter getMeter(); + TelemetryProvider NOOP = new TelemetryProvider() { @Override @@ -20,5 +24,9 @@ public Tracer getTracer() { return Tracer.NOOP; } + @Override + public Meter getMeter() { + return Meter.NOOP; + } }; } diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleCounter.java b/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleCounter.java new file mode 100644 index 0000000000000..c98701bb0a1bb --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleCounter.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +import java.util.Map; + +/** + * A monotonically increasing metric that uses a double. + * Useful for capturing the number of bytes received, number of requests, etc. + */ +public interface DoubleCounter extends Instrument { + /** + * Add one to the current counter. + */ + void increment(); + + /** + * Increment the counter. + * @param inc amount to increment, non-negative + */ + void incrementBy(double inc); + + /** + * Increment the counter. + * @param inc amount to increment, non-negative + * @param attributes key-value pairs to associate with this increment + */ + void incrementBy(double inc, Map attributes); + + /** + * Noop counter for use in tests. + */ + DoubleCounter NOOP = new DoubleCounter() { + @Override + public String getName() { + return "noop"; + } + + @Override + public void increment() { + + } + + @Override + public void incrementBy(double inc) { + + } + + @Override + public void incrementBy(double inc, Map attributes) { + + } + }; +} diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleGauge.java b/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleGauge.java new file mode 100644 index 0000000000000..797c125900bb8 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleGauge.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +import java.util.Map; + +/** + * Record non-additive double values. eg number of running threads, current load + */ +public interface DoubleGauge extends Instrument { + /** + * Record the current value for measured item + */ + void record(double value); + + /** + * Record the current value + * @param attributes key-value pairs to associate with the current measurement + */ + void record(double value, Map attributes); + + /** + * Noop gauge for tests + */ + DoubleGauge NOOP = new DoubleGauge() { + @Override + public String getName() { + return "noop"; + } + + @Override + public void record(double value) { + + } + + @Override + public void record(double value, Map attributes) { + + } + }; +} diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleHistogram.java b/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleHistogram.java new file mode 100644 index 0000000000000..11958ea36cd3d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleHistogram.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +import java.util.Map; + +/** + * Record arbitrary values that are summarized statistically, useful for percentiles and histograms. + */ +public interface DoubleHistogram extends Instrument { + /** + * Record a sample for the measured item + * @param value + */ + void record(double value); + + /** + * Record a sample for the measured item + * @param attributes key-value pairs to associate with the current sample + */ + void record(double value, Map attributes); + + /** + * Noop histogram for tests + */ + DoubleHistogram NOOP = new DoubleHistogram() { + @Override + public String getName() { + return "noop"; + } + + @Override + public void record(double value) { + + } + + @Override + public void record(double value, Map attributes) { + + } + }; +} diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleUpDownCounter.java b/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleUpDownCounter.java new file mode 100644 index 0000000000000..7d484ebf07d32 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/DoubleUpDownCounter.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +import java.util.Map; + +/** + * A counter that supports decreasing and increasing values. + * Useful for capturing the number of requests in a queue. + */ +public interface DoubleUpDownCounter extends Instrument { + /** + * Add to the counter + * @param inc may be negative. + */ + void add(double inc); + + /** + * Add to the counter + * @param inc may be negative. + * @param attributes key-value pairs to associate with this increment + */ + void add(double inc, Map attributes); + + /** + * Noop counter for use in tests + */ + DoubleUpDownCounter NOOP = new DoubleUpDownCounter() { + @Override + public String getName() { + return "noop"; + } + + @Override + public void add(double inc) { + + } + + @Override + public void add(double inc, Map attributes) { + + } + }; +} diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/Instrument.java b/server/src/main/java/org/elasticsearch/telemetry/metric/Instrument.java new file mode 100644 index 0000000000000..19a7e259120f2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/Instrument.java @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +public interface Instrument { + String getName(); +} diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/LongCounter.java b/server/src/main/java/org/elasticsearch/telemetry/metric/LongCounter.java new file mode 100644 index 0000000000000..f8f2150163835 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/LongCounter.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +import java.util.Map; + +/** + * A monotonically increasing metric that uses a long. Useful for integral values such as the number of bytes received, + * number of requests, etc. + */ +public interface LongCounter extends Instrument { + /** + * Add one to the current counter + */ + void increment(); + + /** + * Increment the counter + * @param inc amount to increment + */ + void incrementBy(long inc); + + /** + * Increment the counter. + * @param inc amount to increment + * @param attributes key-value pairs to associate with this increment + */ + void incrementBy(long inc, Map attributes); + + /** + * Noop counter for use in tests. + */ + LongCounter NOOP = new LongCounter() { + @Override + public String getName() { + return "noop"; + } + + @Override + public void increment() { + + } + + @Override + public void incrementBy(long inc) { + + } + + @Override + public void incrementBy(long inc, Map attributes) { + + } + }; +} diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/LongGauge.java b/server/src/main/java/org/elasticsearch/telemetry/metric/LongGauge.java new file mode 100644 index 0000000000000..71539064ce53e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/LongGauge.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +import java.util.Map; + +/** + * Record non-additive long values. + */ +public interface LongGauge extends Instrument { + + /** + * Record the current value of the measured item. + * @param value + */ + void record(long value); + + /** + * Record the current value + * @param attributes key-value pairs to associate with the current measurement + */ + void record(long value, Map attributes); + + /** + * Noop gauge for tests + */ + LongGauge NOOP = new LongGauge() { + @Override + public String getName() { + return "noop"; + } + + @Override + public void record(long value) { + + } + + @Override + public void record(long value, Map attributes) { + + } + }; +} diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/LongHistogram.java b/server/src/main/java/org/elasticsearch/telemetry/metric/LongHistogram.java new file mode 100644 index 0000000000000..27d5261f755ef --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/LongHistogram.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +import java.util.Map; + +/** + * Record arbitrary values that are summarized statistically, useful for percentiles and histograms. + */ +public interface LongHistogram extends Instrument { + /** + * Record a sample for the measured item + * @param value + */ + void record(long value); + + /** + * Record a sample for the measured item + * @param attributes key-value pairs to associate with the current sample + */ + void record(long value, Map attributes); + + /** + * Noop histogram for tests + */ + LongHistogram NOOP = new LongHistogram() { + @Override + public String getName() { + return "noop"; + } + + @Override + public void record(long value) { + + } + + @Override + public void record(long value, Map attributes) { + + } + }; +} diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/LongUpDownCounter.java b/server/src/main/java/org/elasticsearch/telemetry/metric/LongUpDownCounter.java new file mode 100644 index 0000000000000..f62030da8f6bd --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/LongUpDownCounter.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +import java.util.Map; + +/** + * A counter that supports decreasing and increasing values. + * Useful for capturing the number of requests in a queue. + */ +public interface LongUpDownCounter extends Instrument { + /** + * Add to the counter + * @param inc may be negative. + */ + void add(long inc); + + /** + * Add to the counter + * @param inc may be negative. + * @param attributes key-value pairs to associate with this increment + */ + void add(long inc, Map attributes); + + /** + * Noop counter for use in tests + */ + LongUpDownCounter NOOP = new LongUpDownCounter() { + @Override + public String getName() { + return "noop"; + } + + @Override + public void add(long inc) { + + } + + @Override + public void add(long inc, Map attributes) { + + } + }; +} diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/Meter.java b/server/src/main/java/org/elasticsearch/telemetry/metric/Meter.java new file mode 100644 index 0000000000000..77bbf6f673fd3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/Meter.java @@ -0,0 +1,228 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.metric; + +/** + * Container for metering instruments. Meters with the same name and type (DoubleCounter, etc) can + * only be registered once. + * TODO(stu): describe name, unit and description + */ +public interface Meter { + /** + * Register a {@link DoubleCounter}. The returned object may be reused. + * @param name name of the counter + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @return the registered meter. + */ + DoubleCounter registerDoubleCounter(String name, String description, String unit); + + /** + * Retrieved a previously registered {@link DoubleCounter}. + * @param name name of the counter + * @return the registered meter. + */ + DoubleCounter getDoubleCounter(String name); + + /** + * Register a {@link DoubleUpDownCounter}. The returned object may be reused. + * @param name name of the counter + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @return the registered meter. + */ + DoubleUpDownCounter registerDoubleUpDownCounter(String name, String description, String unit); + + /** + * Retrieved a previously registered {@link DoubleUpDownCounter}. + * @param name name of the counter + * @return the registered meter. + */ + DoubleUpDownCounter getDoubleUpDownCounter(String name); + + /** + * Register a {@link DoubleGauge}. The returned object may be reused. + * @param name name of the gauge + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @return the registered meter. + */ + DoubleGauge registerDoubleGauge(String name, String description, String unit); + + /** + * Retrieved a previously registered {@link DoubleGauge}. + * @param name name of the gauge + * @return the registered meter. + */ + DoubleGauge getDoubleGauge(String name); + + /** + * Register a {@link DoubleHistogram}. The returned object may be reused. + * @param name name of the histogram + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @return the registered meter. + */ + DoubleHistogram registerDoubleHistogram(String name, String description, String unit); + + /** + * Retrieved a previously registered {@link DoubleHistogram}. + * @param name name of the histogram + * @return the registered meter. + */ + DoubleHistogram getDoubleHistogram(String name); + + /** + * Register a {@link LongCounter}. The returned object may be reused. + * @param name name of the counter + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @return the registered meter. + */ + LongCounter registerLongCounter(String name, String description, String unit); + + /** + * Retrieved a previously registered {@link LongCounter}. + * @param name name of the counter + * @return the registered meter. + */ + LongCounter getLongCounter(String name); + + /** + * Register a {@link LongUpDownCounter}. The returned object may be reused. + * @param name name of the counter + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @return the registered meter. + */ + LongUpDownCounter registerLongUpDownCounter(String name, String description, String unit); + + /** + * Retrieved a previously registered {@link LongUpDownCounter}. + * @param name name of the counter + * @return the registered meter. + */ + LongUpDownCounter getLongUpDownCounter(String name); + + /** + * Register a {@link LongGauge}. The returned object may be reused. + * @param name name of the gauge + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @return the registered meter. + */ + LongGauge registerLongGauge(String name, String description, String unit); + + /** + * Retrieved a previously registered {@link LongGauge}. + * @param name name of the gauge + * @return the registered meter. + */ + LongGauge getLongGauge(String name); + + /** + * Register a {@link LongHistogram}. The returned object may be reused. + * @param name name of the histogram + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @return the registered meter. + */ + LongHistogram registerLongHistogram(String name, String description, String unit); + + /** + * Retrieved a previously registered {@link LongHistogram}. + * @param name name of the histogram + * @return the registered meter. + */ + LongHistogram getLongHistogram(String name); + + /** + * Noop implementation for tests + */ + Meter NOOP = new Meter() { + @Override + public DoubleCounter registerDoubleCounter(String name, String description, String unit) { + return DoubleCounter.NOOP; + } + + @Override + public DoubleCounter getDoubleCounter(String name) { + return DoubleCounter.NOOP; + } + + public DoubleUpDownCounter registerDoubleUpDownCounter(String name, String description, String unit) { + return DoubleUpDownCounter.NOOP; + } + + @Override + public DoubleUpDownCounter getDoubleUpDownCounter(String name) { + return DoubleUpDownCounter.NOOP; + } + + @Override + public DoubleGauge registerDoubleGauge(String name, String description, String unit) { + return DoubleGauge.NOOP; + } + + @Override + public DoubleGauge getDoubleGauge(String name) { + return DoubleGauge.NOOP; + } + + @Override + public DoubleHistogram registerDoubleHistogram(String name, String description, String unit) { + return DoubleHistogram.NOOP; + } + + @Override + public DoubleHistogram getDoubleHistogram(String name) { + return DoubleHistogram.NOOP; + } + + @Override + public LongCounter registerLongCounter(String name, String description, String unit) { + return LongCounter.NOOP; + } + + @Override + public LongCounter getLongCounter(String name) { + return LongCounter.NOOP; + } + + @Override + public LongUpDownCounter registerLongUpDownCounter(String name, String description, String unit) { + return LongUpDownCounter.NOOP; + } + + @Override + public LongUpDownCounter getLongUpDownCounter(String name) { + return LongUpDownCounter.NOOP; + } + + @Override + public LongGauge registerLongGauge(String name, String description, String unit) { + return LongGauge.NOOP; + } + + @Override + public LongGauge getLongGauge(String name) { + return LongGauge.NOOP; + } + + @Override + public LongHistogram registerLongHistogram(String name, String description, String unit) { + return LongHistogram.NOOP; + } + + @Override + public LongHistogram getLongHistogram(String name) { + return LongHistogram.NOOP; + } + }; +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java index ed4e7683dfefc..1bd97415a6108 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java @@ -22,8 +22,6 @@ import org.elasticsearch.compute.operator.BatchEncoder; import org.elasticsearch.compute.operator.HashAggregationOperator; import org.elasticsearch.compute.operator.MultivalueDedupe; -import org.elasticsearch.logging.LogManager; -import org.elasticsearch.logging.Logger; import java.util.Arrays; import java.util.List; @@ -51,19 +49,20 @@ * } */ final class PackedValuesBlockHash extends BlockHash { - private static final Logger logger = LogManager.getLogger(PackedValuesBlockHash.class); static final int DEFAULT_BATCH_SIZE = Math.toIntExact(ByteSizeValue.ofKb(10).getBytes()); - private final List groups; private final int emitBatchSize; private final BytesRefHash bytesRefHash; private final int nullTrackingBytes; + private final BytesRef scratch = new BytesRef(); + private final BytesRefBuilder bytes = new BytesRefBuilder(); + private final Group[] groups; - PackedValuesBlockHash(List groups, BigArrays bigArrays, int emitBatchSize) { - this.groups = groups; + PackedValuesBlockHash(List specs, BigArrays bigArrays, int emitBatchSize) { + this.groups = specs.stream().map(Group::new).toArray(Group[]::new); this.emitBatchSize = emitBatchSize; this.bytesRefHash = new BytesRefHash(1, bigArrays); - this.nullTrackingBytes = groups.size() / 8 + 1; + this.nullTrackingBytes = (groups.length + 7) / 8; } @Override @@ -75,23 +74,28 @@ void add(Page page, GroupingAggregatorFunction.AddInput addInput, int batchSize) new AddWork(page, addInput, batchSize).add(); } + private static class Group { + final HashAggregationOperator.GroupSpec spec; + BatchEncoder encoder; + int positionOffset; + int valueOffset; + int loopedIndex; + int valueCount; + int bytesStart; + + Group(HashAggregationOperator.GroupSpec spec) { + this.spec = spec; + } + } + class AddWork extends LongLongBlockHash.AbstractAddBlock { - final BatchEncoder[] encoders = new BatchEncoder[groups.size()]; - final int[] positionOffsets = new int[groups.size()]; - final int[] valueOffsets = new int[groups.size()]; - final BytesRef[] scratches = new BytesRef[groups.size()]; - final BytesRefBuilder bytes = new BytesRefBuilder(); final int positionCount; - int position; - int count; - int bufferedGroup; AddWork(Page page, GroupingAggregatorFunction.AddInput addInput, int batchSize) { super(emitBatchSize, addInput); - for (int g = 0; g < groups.size(); g++) { - encoders[g] = MultivalueDedupe.batchEncoder(new Block.Ref(page.getBlock(groups.get(g).channel()), page), batchSize); - scratches[g] = new BytesRef(); + for (Group group : groups) { + group.encoder = MultivalueDedupe.batchEncoder(new Block.Ref(page.getBlock(group.spec.channel()), page), batchSize); } bytes.grow(nullTrackingBytes); this.positionCount = page.getPositionCount(); @@ -104,91 +108,86 @@ class AddWork extends LongLongBlockHash.AbstractAddBlock { */ void add() { for (position = 0; position < positionCount; position++) { - if (logger.isTraceEnabled()) { - logger.trace("position {}", position); - } // Make sure all encoders have encoded the current position and the offsets are queued to it's start - for (int g = 0; g < encoders.length; g++) { - positionOffsets[g]++; - while (positionOffsets[g] >= encoders[g].positionCount()) { - encoders[g].encodeNextBatch(); - positionOffsets[g] = 0; - valueOffsets[g] = 0; + boolean singleEntry = true; + for (Group g : groups) { + var encoder = g.encoder; + g.positionOffset++; + while (g.positionOffset >= encoder.positionCount()) { + encoder.encodeNextBatch(); + g.positionOffset = 0; + g.valueOffset = 0; } + g.valueCount = encoder.valueCount(g.positionOffset); + singleEntry &= (g.valueCount == 1); } - - count = 0; Arrays.fill(bytes.bytes(), 0, nullTrackingBytes, (byte) 0); bytes.setLength(nullTrackingBytes); - addPosition(0); - switch (count) { - case 0 -> throw new IllegalStateException("didn't find any values"); - case 1 -> { - ords.appendInt(bufferedGroup); - addedValue(position); - } - default -> ords.endPositionEntry(); - } - for (int g = 0; g < encoders.length; g++) { - valueOffsets[g] += encoders[g].valueCount(positionOffsets[g]); + if (singleEntry) { + addSingleEntry(); + } else { + addMultipleEntries(); } } emitOrds(); } - private void addPosition(int g) { - if (g == groups.size()) { - addBytes(); - return; - } - int start = bytes.length(); - int count = encoders[g].valueCount(positionOffsets[g]); - assert count > 0; - int valueOffset = valueOffsets[g]; - BytesRef v = encoders[g].read(valueOffset++, scratches[g]); - if (logger.isTraceEnabled()) { - logger.trace("\t".repeat(g + 1) + v); - } - if (v.length == 0) { - assert count == 1 : "null value in non-singleton list"; - int nullByte = g / 8; - int nullShift = g % 8; - bytes.bytes()[nullByte] |= (byte) (1 << nullShift); - } - bytes.setLength(start); - bytes.append(v); - addPosition(g + 1); // TODO stack overflow protection - for (int i = 1; i < count; i++) { - v = encoders[g].read(valueOffset++, scratches[g]); - if (logger.isTraceEnabled()) { - logger.trace("\t".repeat(g + 1) + v); + private void addSingleEntry() { + for (int g = 0; g < groups.length; g++) { + Group group = groups[g]; + BytesRef v = group.encoder.read(group.valueOffset++, scratch); + if (v.length == 0) { + int nullByte = g / 8; + int nullShift = g % 8; + bytes.bytes()[nullByte] |= (byte) (1 << nullShift); + } else { + bytes.append(v); } - assert v.length > 0 : "null value after the first position"; - bytes.setLength(start); - bytes.append(v); - addPosition(g + 1); } + int ord = Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.get()))); + ords.appendInt(ord); + addedValue(position); } - private void addBytes() { - int group = Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.get()))); - switch (count) { - case 0 -> bufferedGroup = group; - case 1 -> { - ords.beginPositionEntry(); - ords.appendInt(bufferedGroup); - addedValueInMultivaluePosition(position); - ords.appendInt(group); - addedValueInMultivaluePosition(position); + private void addMultipleEntries() { + ords.beginPositionEntry(); + int g = 0; + outer: for (;;) { + for (; g < groups.length; g++) { + Group group = groups[g]; + group.bytesStart = bytes.length(); + BytesRef v = group.encoder.read(group.valueOffset + group.loopedIndex, scratch); + ++group.loopedIndex; + if (v.length == 0) { + assert group.valueCount == 1 : "null value in non-singleton list"; + int nullByte = g / 8; + int nullShift = g % 8; + bytes.bytes()[nullByte] |= (byte) (1 << nullShift); + } else { + bytes.append(v); + } } - default -> { - ords.appendInt(group); - addedValueInMultivaluePosition(position); + // emit ords + int ord = Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.get()))); + ords.appendInt(ord); + addedValueInMultivaluePosition(position); + + // rewind + Group group = groups[--g]; + bytes.setLength(group.bytesStart); + while (group.loopedIndex == group.valueCount) { + group.loopedIndex = 0; + if (g == 0) { + break outer; + } else { + group = groups[--g]; + bytes.setLength(group.bytesStart); + } } } - count++; - if (logger.isTraceEnabled()) { - logger.trace("{} = {}", bytes.get(), group); + ords.endPositionEntry(); + for (Group group : groups) { + group.valueOffset += group.valueCount; } } } @@ -196,16 +195,16 @@ private void addBytes() { @Override public Block[] getKeys() { int size = Math.toIntExact(bytesRefHash.size()); - BatchEncoder.Decoder[] decoders = new BatchEncoder.Decoder[groups.size()]; - Block.Builder[] builders = new Block.Builder[groups.size()]; + BatchEncoder.Decoder[] decoders = new BatchEncoder.Decoder[groups.length]; + Block.Builder[] builders = new Block.Builder[groups.length]; for (int g = 0; g < builders.length; g++) { - ElementType elementType = groups.get(g).elementType(); + ElementType elementType = groups[g].spec.elementType(); decoders[g] = BatchEncoder.decoder(elementType); builders[g] = elementType.newBlockBuilder(size); } - BytesRef values[] = new BytesRef[(int) Math.min(100, bytesRefHash.size())]; - BytesRef nulls[] = new BytesRef[values.length]; + BytesRef[] values = new BytesRef[(int) Math.min(100, bytesRefHash.size())]; + BytesRef[] nulls = new BytesRef[values.length]; for (int offset = 0; offset < values.length; offset++) { values[offset] = new BytesRef(); nulls[offset] = new BytesRef(); @@ -231,7 +230,7 @@ public Block[] getKeys() { readKeys(decoders, builders, nulls, values, offset); } - Block[] keyBlocks = new Block[groups.size()]; + Block[] keyBlocks = new Block[groups.length]; for (int g = 0; g < keyBlocks.length; g++) { keyBlocks[g] = builders[g].build(); } @@ -271,13 +270,12 @@ public String toString() { StringBuilder b = new StringBuilder(); b.append("PackedValuesBlockHash{groups=["); boolean first = true; - for (HashAggregationOperator.GroupSpec spec : groups) { - if (first) { - first = false; - } else { + for (int i = 0; i < groups.length; i++) { + if (i > 0) { b.append(", "); } - b.append(spec.channel()).append(':').append(spec.elementType()); + Group group = groups[i]; + b.append(group.spec.channel()).append(':').append(group.spec.elementType()); } b.append("], entries=").append(bytesRefHash.size()); b.append(", size=").append(ByteSizeValue.ofBytes(bytesRefHash.ramBytesUsed()));