Skip to content

Commit

Permalink
Make MultiBucketConsumerService thread safe to use across slices duri…
Browse files Browse the repository at this point in the history
…ng search

Signed-off-by: Neetika Singhal <[email protected]>
  • Loading branch information
neetikasinghal committed Aug 1, 2023
1 parent cc641eb commit 2bafdef
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Create separate SourceLookup instance per segment slice in SignificantTextAggregatorFactory ([#8807](https://github.com/opensearch-project/OpenSearch/pull/8807))
- Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801))
- [Remove] Deprecated Fractional ByteSizeValue support #9005 ([#9005](https://github.com/opensearch-project/OpenSearch/pull/9005))

- Make MultiBucketConsumerService thread safe to use across slices during search ([#9047](https://github.com/opensearch-project/OpenSearch/pull/9047))
### Deprecated

### Removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.common.settings.Setting;
Expand All @@ -42,6 +43,7 @@
import org.opensearch.search.aggregations.bucket.BucketsAggregator;

import java.io.IOException;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.IntConsumer;

/**
Expand Down Expand Up @@ -129,11 +131,13 @@ public static class MultiBucketConsumer implements IntConsumer {

// aggregations execute in a single thread so no atomic here
private int count;
private int callCount = 0;
private LongAdder callCount;
private volatile boolean circuitBreakerTripped;

public MultiBucketConsumer(int limit, CircuitBreaker breaker) {
this.limit = limit;
this.breaker = breaker;
callCount = new LongAdder();
}

@Override
Expand All @@ -153,10 +157,24 @@ public void accept(int value) {
);
}
}
// check parent circuit breaker every 1024 calls
callCount++;
if ((callCount & 0x3FF) == 0) {
breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
callCount.increment();
// tripping the circuit breaker for other threads in case of concurrent search
// if the circuit breaker has tripped for one of the threads already, more info
// can be found on: https://github.com/opensearch-project/OpenSearch/issues/7785
if (circuitBreakerTripped) {
throw new CircuitBreakingException(
"Circuit breaker has tripped for one of the other threads",
CircuitBreaker.Durability.PERMANENT
);
}
// check parent circuit breaker every 1024 to (1024 + available processors) calls
if ((callCount.sum() & 0x3FF) <= Runtime.getRuntime().availableProcessors()) {
try {
breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
} catch (CircuitBreakingException e) {
circuitBreakerTripped = true;
throw e;
}
}
}

Expand Down

0 comments on commit 2bafdef

Please sign in to comment.