Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into async-flush
Browse files Browse the repository at this point in the history
  • Loading branch information
fcofdez committed Nov 27, 2023
2 parents c1a703f + 8914314 commit 391acde
Show file tree
Hide file tree
Showing 51 changed files with 1,299 additions and 299 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.topn.TopNEncoder;
import org.elasticsearch.compute.operator.topn.TopNOperator;
import org.elasticsearch.indices.breaker.CircuitBreakerMetrics;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down Expand Up @@ -103,6 +104,7 @@ private static Operator operator(String data, int topCount) {
default -> throw new IllegalArgumentException("unsupported data type [" + data + "]");
};
CircuitBreakerService breakerService = new HierarchyCircuitBreakerService(
CircuitBreakerMetrics.NOOP,
Settings.EMPTY,
List.of(),
ClusterSettings.createBuiltInClusterSettings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.support.NestedScope;
import org.elasticsearch.indices.breaker.CircuitBreakerMetrics;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
Expand Down Expand Up @@ -108,7 +109,12 @@ public class AggConstructionContentionBenchmark {
@Setup
public void setup() {
breakerService = switch (breaker) {
case "real", "preallocate" -> new HierarchyCircuitBreakerService(Settings.EMPTY, List.of(), clusterSettings);
case "real", "preallocate" -> new HierarchyCircuitBreakerService(
CircuitBreakerMetrics.NOOP,
Settings.EMPTY,
List.of(),
clusterSettings
);
case "noop" -> new NoneCircuitBreakerService();
default -> throw new UnsupportedOperationException();
};
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/101390.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 101390
summary: Enable inter-segment concurrency for terms aggs
area: Aggregations
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/101423.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 101423
summary: Export circuit breaker trip count as a counter metric
area: Aggregations
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.function.ToLongFunction;

public class ParentAggregationBuilder extends ValuesSourceAggregationBuilder<ParentAggregationBuilder> {

Expand Down Expand Up @@ -90,7 +91,7 @@ public BucketCardinality bucketCardinality() {
}

@Override
public boolean supportsParallelCollection() {
public boolean supportsParallelCollection(ToLongFunction<String> fieldCardinalityResolver) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ public void testNodeHttpStats() throws IOException {
assertHttpStats(new XContentTestUtils.JsonMapView((Map<String, Object>) nodesMap.get(nodeId)));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/102547")
@SuppressWarnings("unchecked")
public void testClusterInfoHttpStats() throws IOException {
internalCluster().ensureAtLeastNumDataNodes(3);
performHttpRequests();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public ChannelActionListener(TransportChannel channel) {

@Override
public void onResponse(Response response) {
response.incRef(); // acquire reference that will be released by channel.sendResponse below
ActionListener.run(this, l -> l.channel.sendResponse(response));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.indices.breaker.BreakerSettings;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.telemetry.metric.LongCounter;

import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -29,17 +30,25 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
private final Logger logger;
private final HierarchyCircuitBreakerService parent;
private final String name;
private final LongCounter trippedCountMeter;

/**
* Create a circuit breaker that will break if the number of estimated
* bytes grows above the limit. All estimations will be multiplied by
* the given overheadConstant. Uses the given oldBreaker to initialize
* the starting offset.
* @param trippedCountMeter the counter used to report the tripped count metric
* @param settings settings to configure this breaker
* @param parent parent circuit breaker service to delegate tripped breakers to
* @param name the name of the breaker
*/
public ChildMemoryCircuitBreaker(BreakerSettings settings, Logger logger, HierarchyCircuitBreakerService parent, String name) {
public ChildMemoryCircuitBreaker(
LongCounter trippedCountMeter,
BreakerSettings settings,
Logger logger,
HierarchyCircuitBreakerService parent,
String name
) {
this.name = name;
this.limitAndOverhead = new LimitAndOverhead(settings.getLimit(), settings.getOverhead());
this.durability = settings.getDurability();
Expand All @@ -48,6 +57,7 @@ public ChildMemoryCircuitBreaker(BreakerSettings settings, Logger logger, Hierar
this.logger = logger;
logger.trace(() -> format("creating ChildCircuitBreaker with settings %s", settings));
this.parent = parent;
this.trippedCountMeter = trippedCountMeter;
}

/**
Expand All @@ -58,6 +68,7 @@ public ChildMemoryCircuitBreaker(BreakerSettings settings, Logger logger, Hierar
public void circuitBreak(String fieldName, long bytesNeeded) {
final long memoryBytesLimit = this.limitAndOverhead.limit;
this.trippedCount.incrementAndGet();
this.trippedCountMeter.increment();
final String message = "["
+ this.name
+ "] Data too large, data for ["
Expand Down
10 changes: 7 additions & 3 deletions server/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.fielddata.FieldDataContext;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.fielddata.ordinals.GlobalOrdinalsAccounting;
Expand Down Expand Up @@ -158,7 +159,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final IndexNameExpressionResolver expressionResolver;
private final Supplier<Sort> indexSortSupplier;
private final ValuesSourceRegistry valuesSourceRegistry;
private Supplier<DocumentParsingObserver> documentParsingObserverSupplier;
private final Supplier<DocumentParsingObserver> documentParsingObserverSupplier;

@SuppressWarnings("this-escape")
public IndexService(
Expand Down Expand Up @@ -231,7 +232,7 @@ public IndexService(
this.indexSortSupplier = () -> indexSettings.getIndexSortConfig()
.buildIndexSort(
mapperService::fieldType,
(fieldType, searchLookup) -> indexFieldData.getForField(fieldType, FieldDataContext.noRuntimeFields("index sort"))
(fieldType, searchLookup) -> loadFielddata(fieldType, FieldDataContext.noRuntimeFields("index sort"))
);
} else {
this.indexSortSupplier = () -> null;
Expand Down Expand Up @@ -662,7 +663,7 @@ public SearchExecutionContext newSearchExecutionContext(
shardRequestIndex,
indexSettings,
indexCache.bitsetFilterCache(),
indexFieldData::getForField,
this::loadFielddata,
mapperService(),
mapperService().mappingLookup(),
similarityService(),
Expand Down Expand Up @@ -1293,4 +1294,7 @@ public static Map<String, MappedFieldType> parseRuntimeMappings(
return runtimeFieldTypes;
}

public IndexFieldData<?> loadFielddata(MappedFieldType fieldType, FieldDataContext fieldDataContext) {
return indexFieldData.getForField(fieldType, fieldDataContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.indices.breaker;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.LongCounter;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;

/**
* A class collecting trip counters for circuit breakers (parent, field data, request, in flight requests and custom child circuit
* breakers).
*
* The circuit breaker name is part of the (long) counter metric name instead of being an attribute because aggregating distinct circuit
* breakers trip counter values does not make sense, as for instance, summing es.breaker.field_data.trip.total and
* es.breaker.in_flight_requests.trip.total.
* Those counters trip for different reasons even if the underlying reason is "too much memory usage". Aggregating them together results in
* losing the ability to understand where the underlying issue is (too much field data, too many concurrent requests, too large concurrent
* requests?). Aggregating each one of them separately to get, for instance, cluster level or cloud region level statistics is perfectly
* fine, instead.
*
* NOTE: here we have the ability to register custom trip counters too. This ability is something a few plugins take advantage of nowadays.
* At the time of writing this class it is just "Eql" and "MachineLearning" which track memory used to store "things" that are
* application/plugin specific such as eql sequence query objects and inference model objects. As a result, we just have a couple of this
* custom counters. This means we have 6 circuit breaker counter metrics per node (parent, field_data, request, in_flight_requests,
* eql_sequence and model_inference). We register them a bit differently to keep the ability for plugins to define their own circuit breaker
* trip counters.
*/
public class CircuitBreakerMetrics {
public static final CircuitBreakerMetrics NOOP = new CircuitBreakerMetrics(TelemetryProvider.NOOP, Collections.emptyMap());
public static final String ES_BREAKER_PARENT_TRIP_COUNT_TOTAL = "es.breaker.parent.trip.total";
public static final String ES_BREAKER_FIELD_DATA_TRIP_COUNT_TOTAL = "es.breaker.field_data.trip.total";
public static final String ES_BREAKER_REQUEST_TRIP_COUNT_TOTAL = "es.breaker.request.trip.total";
public static final String ES_BREAKER_IN_FLIGHT_REQUESTS_TRIP_COUNT_TOTAL = "es.breaker.in_flight_requests.trip.total";

private static final String ES_BREAKER_CUSTOM_TRIP_COUNT_TOTAL_TEMPLATE = "es.breaker.%s.trip.total";
private final MeterRegistry registry;
private final LongCounter parentTripCountTotal;
private final LongCounter fielddataTripCountTotal;
private final LongCounter requestTripCountTotal;
private final LongCounter inFlightRequestsCountTotal;
private final Map<String, LongCounter> customTripCountsTotal;

private CircuitBreakerMetrics(
final MeterRegistry registry,
final LongCounter parentTripCountTotal,
final LongCounter fielddataTripCountTotal,
final LongCounter requestTripCountTotal,
final LongCounter inFlightRequestsCountTotal,
final Map<String, LongCounter> customTripCountsTotal
) {
this.registry = registry;
this.parentTripCountTotal = parentTripCountTotal;
this.fielddataTripCountTotal = fielddataTripCountTotal;
this.requestTripCountTotal = requestTripCountTotal;
this.inFlightRequestsCountTotal = inFlightRequestsCountTotal;
this.customTripCountsTotal = customTripCountsTotal;
}

public CircuitBreakerMetrics(final TelemetryProvider telemetryProvider, final Map<String, LongCounter> customTripCounters) {
this(
telemetryProvider.getMeterRegistry(),
telemetryProvider.getMeterRegistry()
.registerLongCounter(ES_BREAKER_PARENT_TRIP_COUNT_TOTAL, "Parent circuit breaker trip count", "count"),
telemetryProvider.getMeterRegistry()
.registerLongCounter(ES_BREAKER_FIELD_DATA_TRIP_COUNT_TOTAL, "Field data circuit breaker trip count", "count"),
telemetryProvider.getMeterRegistry()
.registerLongCounter(ES_BREAKER_REQUEST_TRIP_COUNT_TOTAL, "Request circuit breaker trip count", "count"),
telemetryProvider.getMeterRegistry()
.registerLongCounter(
ES_BREAKER_IN_FLIGHT_REQUESTS_TRIP_COUNT_TOTAL,
"In-flight requests circuit breaker trip count",
"count"
),
customTripCounters
);
}

public LongCounter getParentTripCountTotal() {
return parentTripCountTotal;
}

public LongCounter getFielddataTripCountTotal() {
return fielddataTripCountTotal;
}

public LongCounter getRequestTripCountTotal() {
return requestTripCountTotal;
}

public LongCounter getInFlightRequestsCountTotal() {
return inFlightRequestsCountTotal;
}

public Map<String, LongCounter> getCustomTripCountsTotal() {
return customTripCountsTotal;
}

public LongCounter getCustomTripCount(final String name, final LongCounter theDefault) {
return this.customTripCountsTotal.getOrDefault(name, theDefault);
}

public LongCounter getCustomTripCount(final String name) {
return this.customTripCountsTotal.getOrDefault(name, LongCounter.NOOP);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CircuitBreakerMetrics that = (CircuitBreakerMetrics) o;
return Objects.equals(registry, that.registry)
&& Objects.equals(parentTripCountTotal, that.parentTripCountTotal)
&& Objects.equals(fielddataTripCountTotal, that.fielddataTripCountTotal)
&& Objects.equals(requestTripCountTotal, that.requestTripCountTotal)
&& Objects.equals(inFlightRequestsCountTotal, that.inFlightRequestsCountTotal)
&& Objects.equals(customTripCountsTotal, that.customTripCountsTotal);
}

@Override
public int hashCode() {
return Objects.hash(
registry,
parentTripCountTotal,
fielddataTripCountTotal,
requestTripCountTotal,
inFlightRequestsCountTotal,
customTripCountsTotal
);
}

@Override
public String toString() {
return "CircuitBreakerMetrics{"
+ "registry="
+ registry
+ ", parentTripCountTotal="
+ parentTripCountTotal
+ ", fielddataTripCountTotal="
+ fielddataTripCountTotal
+ ", requestTripCountTotal="
+ requestTripCountTotal
+ ", inFlightRequestsCountTotal="
+ inFlightRequestsCountTotal
+ ", customTripCountsTotal="
+ customTripCountsTotal
+ '}';
}

public void addCustomCircuitBreaker(final CircuitBreaker circuitBreaker) {
if (this.customTripCountsTotal.containsKey(circuitBreaker.getName())) {
throw new IllegalArgumentException("A circuit circuitBreaker named [" + circuitBreaker.getName() + " already exists");
}
final String canonicalName = Strings.format(ES_BREAKER_CUSTOM_TRIP_COUNT_TOTAL_TEMPLATE, circuitBreaker.getName());
this.customTripCountsTotal.put(
canonicalName,
registry.registerLongCounter(canonicalName, "A custom circuit circuitBreaker [" + circuitBreaker.getName() + "]", "count")
);
}

}
Loading

0 comments on commit 391acde

Please sign in to comment.