Skip to content

Commit

Permalink
Add support for aggregation profiler with concurrent aggregation (ope…
Browse files Browse the repository at this point in the history
…nsearch-project#8801)

* Add support for aggregation profiler with concurrent aggregation (opensearch-project#8801)

Signed-off-by: Ticheng Lin <[email protected]>

* Address review comments for support for aggregation profiler with concurrent aggregation (opensearch-project#8801)

Signed-off-by: Ticheng Lin <[email protected]>

* Refactor ProfileResult class and add more tests

Signed-off-by: Ticheng Lin <[email protected]>

* Fix flaky QueryProfilePhaseTests.testCollapseQuerySearchResults test

Signed-off-by: Ticheng Lin <[email protected]>

---------

Signed-off-by: Ticheng Lin <[email protected]>
Signed-off-by: Ivan Brusic <[email protected]>
  • Loading branch information
ticheng-aws authored and brusic committed Sep 25, 2023
1 parent 1fbd7ef commit ba179f6
Show file tree
Hide file tree
Showing 18 changed files with 800 additions and 68 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Exclude 'benchmarks' from codecov report ([#8805](https://github.com/opensearch-project/OpenSearch/pull/8805))
- [Refactor] MediaTypeParser to MediaTypeParserRegistry ([#8636](https://github.com/opensearch-project/OpenSearch/pull/8636))
- Create separate SourceLookup instance per segment slice in SignificantTextAggregatorFactory ([#8807](https://github.com/opensearch-project/OpenSearch/pull/8807))
- Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801))

### Deprecated

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
context.minimumScore(source.minScore());
}
if (source.profile()) {
context.setProfilers(new Profilers(context.searcher()));
context.setProfilers(new Profilers(context.searcher(), context.isConcurrentSegmentSearchEnabled()));
}
if (source.timeout() != null) {
context.timeout(source.timeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ public abstract class AbstractProfileBreakdown<T extends Enum<T>> {
/**
* The accumulated timings for this query node
*/
private final Timer[] timings;
private final T[] timingTypes;
protected final Timer[] timings;
protected final T[] timingTypes;
public static final String TIMING_TYPE_COUNT_SUFFIX = "_count";
public static final String TIMING_TYPE_START_TIME_SUFFIX = "_start_time";

/** Sole constructor. */
public AbstractProfileBreakdown(Class<T> clazz) {
Expand All @@ -74,17 +76,10 @@ public void setTimer(T timing, Timer timer) {
* Build a timing count breakdown for current instance
*/
public Map<String, Long> toBreakdownMap() {
return buildBreakdownMap(this);
}

/**
* Build a timing count breakdown for arbitrary instance
*/
protected final Map<String, Long> buildBreakdownMap(AbstractProfileBreakdown<T> breakdown) {
Map<String, Long> map = new HashMap<>(breakdown.timings.length * 2);
for (T timingType : breakdown.timingTypes) {
map.put(timingType.toString(), breakdown.timings[timingType.ordinal()].getApproximateTiming());
map.put(timingType.toString() + "_count", breakdown.timings[timingType.ordinal()].getCount());
Map<String, Long> map = new HashMap<>(this.timings.length * 3);
for (T timingType : this.timingTypes) {
map.put(timingType.toString(), this.timings[timingType.ordinal()].getApproximateTiming());
map.put(timingType + TIMING_TYPE_COUNT_SUFFIX, this.timings[timingType.ordinal()].getCount());
}
return Collections.unmodifiableMap(map);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.search.profile;

import org.opensearch.Version;
import org.opensearch.core.ParseField;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand All @@ -44,8 +45,10 @@

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

Expand All @@ -69,14 +72,23 @@ public final class ProfileResult implements Writeable, ToXContentObject {
static final ParseField BREAKDOWN = new ParseField("breakdown");
static final ParseField DEBUG = new ParseField("debug");
static final ParseField NODE_TIME = new ParseField("time");
static final ParseField MAX_SLICE_NODE_TIME = new ParseField("max_slice_time");
static final ParseField MIN_SLICE_NODE_TIME = new ParseField("min_slice_time");
static final ParseField AVG_SLICE_NODE_TIME = new ParseField("avg_slice_time");
static final ParseField NODE_TIME_RAW = new ParseField("time_in_nanos");
static final ParseField MAX_SLICE_NODE_TIME_RAW = new ParseField("max_slice_time_in_nanos");
static final ParseField MIN_SLICE_NODE_TIME_RAW = new ParseField("min_slice_time_in_nanos");
static final ParseField AVG_SLICE_NODE_TIME_RAW = new ParseField("avg_slice_time_in_nanos");
static final ParseField CHILDREN = new ParseField("children");

private final String type;
private final String description;
private final Map<String, Long> breakdown;
private final Map<String, Object> debug;
private final long nodeTime;
private Long maxSliceNodeTime;
private Long minSliceNodeTime;
private Long avgSliceNodeTime;
private final List<ProfileResult> children;

public ProfileResult(
Expand All @@ -86,13 +98,30 @@ public ProfileResult(
Map<String, Object> debug,
long nodeTime,
List<ProfileResult> children
) {
this(type, description, breakdown, debug, nodeTime, children, null, null, null);
}

public ProfileResult(
String type,
String description,
Map<String, Long> breakdown,
Map<String, Object> debug,
long nodeTime,
List<ProfileResult> children,
Long maxSliceNodeTime,
Long minSliceNodeTime,
Long avgSliceNodeTime
) {
this.type = type;
this.description = description;
this.breakdown = Objects.requireNonNull(breakdown, "required breakdown argument missing");
this.debug = debug == null ? Map.of() : debug;
this.children = children == null ? List.of() : children;
this.nodeTime = nodeTime;
this.maxSliceNodeTime = maxSliceNodeTime;
this.minSliceNodeTime = minSliceNodeTime;
this.avgSliceNodeTime = avgSliceNodeTime;
}

/**
Expand All @@ -105,6 +134,15 @@ public ProfileResult(StreamInput in) throws IOException {
breakdown = in.readMap(StreamInput::readString, StreamInput::readLong);
debug = in.readMap(StreamInput::readString, StreamInput::readGenericValue);
children = in.readList(ProfileResult::new);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.maxSliceNodeTime = in.readOptionalLong();
this.minSliceNodeTime = in.readOptionalLong();
this.avgSliceNodeTime = in.readOptionalLong();
} else {
this.maxSliceNodeTime = null;
this.minSliceNodeTime = null;
this.avgSliceNodeTime = null;
}
}

@Override
Expand All @@ -115,6 +153,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(breakdown, StreamOutput::writeString, StreamOutput::writeLong);
out.writeMap(debug, StreamOutput::writeString, StreamOutput::writeGenericValue);
out.writeList(children);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalLong(maxSliceNodeTime);
out.writeOptionalLong(minSliceNodeTime);
out.writeOptionalLong(avgSliceNodeTime);
}
}

/**
Expand Down Expand Up @@ -154,6 +197,18 @@ public long getTime() {
return nodeTime;
}

public Long getMaxSliceTime() {
return maxSliceNodeTime;
}

public Long getMinSliceTime() {
return minSliceNodeTime;
}

public Long getAvgSliceTime() {
return avgSliceNodeTime;
}

/**
* Returns a list of all profiled children queries
*/
Expand All @@ -168,9 +223,27 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(DESCRIPTION.getPreferredName(), description);
if (builder.humanReadable()) {
builder.field(NODE_TIME.getPreferredName(), new TimeValue(getTime(), TimeUnit.NANOSECONDS).toString());
if (getMaxSliceTime() != null) {
builder.field(MAX_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getMaxSliceTime(), TimeUnit.NANOSECONDS).toString());
}
if (getMinSliceTime() != null) {
builder.field(MIN_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getMinSliceTime(), TimeUnit.NANOSECONDS).toString());
}
if (getAvgSliceTime() != null) {
builder.field(AVG_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getAvgSliceTime(), TimeUnit.NANOSECONDS).toString());
}
}
builder.field(NODE_TIME_RAW.getPreferredName(), getTime());
builder.field(BREAKDOWN.getPreferredName(), breakdown);
if (getMaxSliceTime() != null) {
builder.field(MAX_SLICE_NODE_TIME_RAW.getPreferredName(), getMaxSliceTime());
}
if (getMinSliceTime() != null) {
builder.field(MIN_SLICE_NODE_TIME_RAW.getPreferredName(), getMinSliceTime());
}
if (getAvgSliceTime() != null) {
builder.field(AVG_SLICE_NODE_TIME_RAW.getPreferredName(), getAvgSliceTime());
}
createBreakdownView(builder);
if (false == debug.isEmpty()) {
builder.field(DEBUG.getPreferredName(), debug);
}
Expand All @@ -186,6 +259,22 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder.endObject();
}

private void createBreakdownView(XContentBuilder builder) throws IOException {
Map<String, Long> modifiedBreakdown = new LinkedHashMap<>(breakdown);
removeStartTimeFields(modifiedBreakdown);
builder.field(BREAKDOWN.getPreferredName(), modifiedBreakdown);
}

static void removeStartTimeFields(Map<String, Long> modifiedBreakdown) {
Iterator<Map.Entry<String, Long>> iterator = modifiedBreakdown.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Long> entry = iterator.next();
if (entry.getKey().endsWith(AbstractProfileBreakdown.TIMING_TYPE_START_TIME_SUFFIX)) {
iterator.remove();
}
}
}

private static final InstantiatingObjectParser<ProfileResult, Void> PARSER;
static {
InstantiatingObjectParser.Builder<ProfileResult, Void> parser = InstantiatingObjectParser.builder(
Expand All @@ -199,6 +288,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
parser.declareObject(optionalConstructorArg(), (p, c) -> p.map(), DEBUG);
parser.declareLong(constructorArg(), NODE_TIME_RAW);
parser.declareObjectArray(optionalConstructorArg(), (p, c) -> fromXContent(p), CHILDREN);
parser.declareLong(optionalConstructorArg(), MAX_SLICE_NODE_TIME_RAW);
parser.declareLong(optionalConstructorArg(), MIN_SLICE_NODE_TIME_RAW);
parser.declareLong(optionalConstructorArg(), AVG_SLICE_NODE_TIME_RAW);
PARSER = parser.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.profile.aggregation.AggregationProfiler;
import org.opensearch.search.profile.aggregation.ConcurrentAggregationProfiler;
import org.opensearch.search.profile.query.QueryProfiler;

import java.util.ArrayList;
Expand All @@ -50,18 +51,20 @@ public final class Profilers {
private final ContextIndexSearcher searcher;
private final List<QueryProfiler> queryProfilers;
private final AggregationProfiler aggProfiler;
private final boolean isConcurrentSegmentSearchEnabled;

/** Sole constructor. This {@link Profilers} instance will initially wrap one {@link QueryProfiler}. */
public Profilers(ContextIndexSearcher searcher) {
public Profilers(ContextIndexSearcher searcher, boolean isConcurrentSegmentSearchEnabled) {
this.searcher = searcher;
this.isConcurrentSegmentSearchEnabled = isConcurrentSegmentSearchEnabled;
this.queryProfilers = new ArrayList<>();
this.aggProfiler = new AggregationProfiler();
this.aggProfiler = isConcurrentSegmentSearchEnabled ? new ConcurrentAggregationProfiler() : new AggregationProfiler();
addQueryProfiler();
}

/** Switch to a new profile. */
public QueryProfiler addQueryProfiler() {
QueryProfiler profiler = new QueryProfiler(searcher.getExecutor() != null);
QueryProfiler profiler = new QueryProfiler(isConcurrentSegmentSearchEnabled);
searcher.setProfiler(profiler);
queryProfilers.add(profiler);
return profiler;
Expand Down
13 changes: 12 additions & 1 deletion server/src/main/java/org/opensearch/search/profile/Timer.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
public class Timer {

private boolean doTiming;
private long timing, count, lastCount, start;
private long timing, count, lastCount, start, earliestTimerStartTime;

/** pkg-private for testing */
long nanoTime() {
Expand All @@ -71,6 +71,9 @@ public final void start() {
doTiming = (count - lastCount) >= Math.min(lastCount >>> 8, 1024);
if (doTiming) {
start = nanoTime();
if (count == 0) {
earliestTimerStartTime = start;
}
}
count++;
}
Expand All @@ -92,6 +95,14 @@ public final long getCount() {
return count;
}

/** Return the timer start time in nanoseconds.*/
public final long getEarliestTimerStartTime() {
if (start != 0) {
throw new IllegalStateException("#start call misses a matching #stop call");
}
return earliestTimerStartTime;
}

/** Return an approximation of the total time spent between consecutive calls of #start and #stop. */
public final long getApproximateTiming() {
if (start != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.search.profile.AbstractProfileBreakdown;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -62,4 +63,18 @@ public void addDebugInfo(String key, Object value) {
protected Map<String, Object> toDebugMap() {
return unmodifiableMap(extra);
}

/**
* Build a timing count startTime breakdown for aggregation timing types
*/
@Override
public Map<String, Long> toBreakdownMap() {
Map<String, Long> map = new HashMap<>(timings.length * 3);
for (AggregationTimingType timingType : timingTypes) {
map.put(timingType.toString(), timings[timingType.ordinal()].getApproximateTiming());
map.put(timingType + TIMING_TYPE_COUNT_SUFFIX, timings[timingType.ordinal()].getCount());
map.put(timingType + TIMING_TYPE_START_TIME_SUFFIX, timings[timingType.ordinal()].getEarliestTimerStartTime());
}
return Collections.unmodifiableMap(map);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import org.opensearch.search.profile.AbstractProfiler;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -47,29 +45,25 @@
*/
public class AggregationProfiler extends AbstractProfiler<AggregationProfileBreakdown, Aggregator> {

private final Map<List<String>, AggregationProfileBreakdown> profileBreakdownLookup = new HashMap<>();
private final Map<Aggregator, AggregationProfileBreakdown> profileBreakdownLookup = new HashMap<>();

public AggregationProfiler() {
super(new InternalAggregationProfileTree());
}

/**
* This method does not need to be thread safe for concurrent search use case as well.
* The {@link AggregationProfileBreakdown} for each Aggregation operator is created in sync path when
* {@link org.opensearch.search.aggregations.BucketCollector#preCollection()} is called
* on the Aggregation collector instances during construction.
*/
@Override
public AggregationProfileBreakdown getQueryBreakdown(Aggregator agg) {
List<String> path = getAggregatorPath(agg);
AggregationProfileBreakdown aggregationProfileBreakdown = profileBreakdownLookup.get(path);
AggregationProfileBreakdown aggregationProfileBreakdown = profileBreakdownLookup.get(agg);
if (aggregationProfileBreakdown == null) {
aggregationProfileBreakdown = super.getQueryBreakdown(agg);
profileBreakdownLookup.put(path, aggregationProfileBreakdown);
profileBreakdownLookup.put(agg, aggregationProfileBreakdown);
}
return aggregationProfileBreakdown;
}

public static List<String> getAggregatorPath(Aggregator agg) {
LinkedList<String> path = new LinkedList<>();
while (agg != null) {
path.addFirst(agg.name());
agg = agg.parent();
}
return path;
}
}
Loading

0 comments on commit ba179f6

Please sign in to comment.