Skip to content

Commit

Permalink
Introduce durability of circuit breaking exception
Browse files Browse the repository at this point in the history
With this commit we differentiate between permanent circuit breaking
exceptions (which require intervention from an operator and should not
be automatically retried) and transient ones (which may heal themselves
eventually and should be retried). Furthermore, the parent circuit
breaker will categorize a circuit breaking exception as either transient
or permanent based on the categorization of memory usage of its child
circuit breakers.

Closes #31986
Relates #34460
  • Loading branch information
danielmitterdorfer authored Nov 2, 2018
1 parent 0aa66b0 commit ccbe80c
Show file tree
Hide file tree
Showing 16 changed files with 183 additions and 62 deletions.
2 changes: 2 additions & 0 deletions docs/reference/release-notes/7.0.0-alpha1.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Cross-Cluster-Search::

Rest API::
* The Clear Cache API only supports `POST` as HTTP method
* `CircuitBreakingException` was previously mapped to HTTP status code 503 and is now
mapped as HTTP status code 429.

Aggregations::
* The Percentiles and PercentileRanks aggregations now return `null` in the REST response,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -63,7 +64,7 @@ public class RankEvalResponseTests extends ESTestCase {

private static final Exception[] RANDOM_EXCEPTIONS = new Exception[] {
new ClusterBlockException(singleton(DiscoverySettings.NO_MASTER_BLOCK_WRITES)),
new CircuitBreakingException("Data too large", 123, 456),
new CircuitBreakingException("Data too large", 123, 456, CircuitBreaker.Durability.PERMANENT),
new SearchParseException(new TestSearchContext(null), "Parse failure", new XContentLocation(12, 98)),
new IllegalArgumentException("Closed resource", new RuntimeException("Resource")),
new SearchPhaseExecutionException("search", "all shards failed",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void testLimitsInFlightRequests() throws Exception {
Collection<FullHttpResponse> multipleResponses = nettyHttpClient.post(transportAddress.address(), requests);
try {
assertThat(multipleResponses, hasSize(requests.length));
assertAtLeastOnceExpectedStatus(multipleResponses, HttpResponseStatus.SERVICE_UNAVAILABLE);
assertAtLeastOnceExpectedStatus(multipleResponses, HttpResponseStatus.TOO_MANY_REQUESTS);
} finally {
multipleResponses.forEach(ReferenceCounted::release);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
public class ChildMemoryCircuitBreaker implements CircuitBreaker {

private final long memoryBytesLimit;
private final BreakerSettings settings;
private final double overheadConstant;
private final Durability durability;
private final AtomicLong used;
private final AtomicLong trippedCount;
private final Logger logger;
Expand Down Expand Up @@ -66,9 +66,9 @@ public ChildMemoryCircuitBreaker(BreakerSettings settings, Logger logger,
public ChildMemoryCircuitBreaker(BreakerSettings settings, ChildMemoryCircuitBreaker oldBreaker,
Logger logger, HierarchyCircuitBreakerService parent, String name) {
this.name = name;
this.settings = settings;
this.memoryBytesLimit = settings.getLimit();
this.overheadConstant = settings.getOverhead();
this.durability = settings.getDurability();
if (oldBreaker == null) {
this.used = new AtomicLong(0);
this.trippedCount = new AtomicLong(0);
Expand All @@ -78,7 +78,7 @@ public ChildMemoryCircuitBreaker(BreakerSettings settings, ChildMemoryCircuitBre
}
this.logger = logger;
if (logger.isTraceEnabled()) {
logger.trace("creating ChildCircuitBreaker with settings {}", this.settings);
logger.trace("creating ChildCircuitBreaker with settings {}", settings);
}
this.parent = parent;
}
Expand All @@ -95,7 +95,7 @@ public void circuitBreak(String fieldName, long bytesNeeded) {
", which is larger than the limit of [" +
memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]";
logger.debug("{}", message);
throw new CircuitBreakingException(message, bytesNeeded, memoryBytesLimit);
throw new CircuitBreakingException(message, bytesNeeded, memoryBytesLimit, durability);
}

/**
Expand Down Expand Up @@ -234,4 +234,12 @@ public long getTrippedCount() {
public String getName() {
return this.name;
}

/**
* @return whether a tripped circuit breaker will reset itself (transient) or requires manual intervention (permanent).
*/
@Override
public Durability getDurability() {
return this.durability;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ public static Type parseValue(String value) {
}
}

enum Durability {
// The condition that tripped the circuit breaker fixes itself eventually.
TRANSIENT,
// The condition that tripped the circuit breaker requires manual intervention.
PERMANENT
}

/**
* Trip the circuit breaker
* @param fieldName name of the field responsible for tripping the breaker
Expand Down Expand Up @@ -127,4 +134,9 @@ public static Type parseValue(String value) {
* @return the name of the breaker
*/
String getName();

/**
* @return whether a tripped circuit breaker will reset itself (transient) or requires manual intervention (permanent).
*/
Durability getDurability();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.common.breaker;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -33,30 +34,38 @@ public class CircuitBreakingException extends ElasticsearchException {

private final long bytesWanted;
private final long byteLimit;

public CircuitBreakingException(String message) {
super(message);
this.bytesWanted = 0;
this.byteLimit = 0;
}
private final CircuitBreaker.Durability durability;

public CircuitBreakingException(StreamInput in) throws IOException {
super(in);
byteLimit = in.readLong();
bytesWanted = in.readLong();
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
durability = in.readEnum(CircuitBreaker.Durability.class);
} else {
durability = CircuitBreaker.Durability.PERMANENT;
}
}

public CircuitBreakingException(String message, long bytesWanted, long byteLimit) {
public CircuitBreakingException(String message, CircuitBreaker.Durability durability) {
this(message, 0, 0, durability);
}

public CircuitBreakingException(String message, long bytesWanted, long byteLimit, CircuitBreaker.Durability durability) {
super(message);
this.bytesWanted = bytesWanted;
this.byteLimit = byteLimit;
this.durability = durability;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(byteLimit);
out.writeLong(bytesWanted);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeEnum(durability);
}
}

public long getBytesWanted() {
Expand All @@ -67,14 +76,19 @@ public long getByteLimit() {
return this.byteLimit;
}

public CircuitBreaker.Durability getDurability() {
return durability;
}

@Override
public RestStatus status() {
return RestStatus.SERVICE_UNAVAILABLE;
return RestStatus.TOO_MANY_REQUESTS;
}

@Override
protected void metadataToXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("bytes_wanted", bytesWanted);
builder.field("bytes_limit", byteLimit);
builder.field("durability", durability);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void circuitBreak(String fieldName, long bytesNeeded) throws CircuitBreak
", which is larger than the limit of [" +
memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]";
logger.debug("{}", message);
throw new CircuitBreakingException(message, bytesNeeded, memoryBytesLimit);
throw new CircuitBreakingException(message, bytesNeeded, memoryBytesLimit, Durability.PERMANENT);
}

/**
Expand Down Expand Up @@ -197,4 +197,9 @@ public long getTrippedCount() {
public String getName() {
return FIELDDATA;
}

@Override
public Durability getDurability() {
return Durability.PERMANENT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,9 @@ public long getTrippedCount() {
public String getName() {
return this.name;
}

@Override
public Durability getDurability() {
return Durability.PERMANENT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,24 @@
/**
* Settings for a {@link CircuitBreaker}
*/
public class BreakerSettings {
public final class BreakerSettings {

private final String name;
private final long limitBytes;
private final double overhead;
private final CircuitBreaker.Type type;
private final CircuitBreaker.Durability durability;

public BreakerSettings(String name, long limitBytes, double overhead) {
this(name, limitBytes, overhead, CircuitBreaker.Type.MEMORY);
this(name, limitBytes, overhead, CircuitBreaker.Type.MEMORY, CircuitBreaker.Durability.PERMANENT);
}

public BreakerSettings(String name, long limitBytes, double overhead, CircuitBreaker.Type type) {
public BreakerSettings(String name, long limitBytes, double overhead, CircuitBreaker.Type type, CircuitBreaker.Durability durability) {
this.name = name;
this.limitBytes = limitBytes;
this.overhead = overhead;
this.type = type;
this.durability = durability;
}

public String getName() {
Expand All @@ -59,10 +61,15 @@ public CircuitBreaker.Type getType() {
return this.type;
}

public CircuitBreaker.Durability getDurability() {
return durability;
}

@Override
public String toString() {
return "[" + this.name +
",type=" + this.type.toString() +
",durability=" + this.durability.toString() +
",limit=" + this.limitBytes + "/" + new ByteSizeValue(this.limitBytes) +
",overhead=" + this.overhead + "]";
}
Expand Down
Loading

0 comments on commit ccbe80c

Please sign in to comment.