Skip to content

Commit

Permalink
Profile the fetch phase (backport of elastic#77064)
Browse files Browse the repository at this point in the history
This adds profiling to the fetch phase so we can tell when fetching is
slower than we'd like and we can tell which portion of the fetch is
slow. The output includes which stored fields were loaded, how long it
took to load stored fields, which fetch sub-phases were run, and how
long those fetch sub-phases took.

Closes elastic#75892
  • Loading branch information
nik9000 committed Sep 13, 2021
1 parent 5087b9f commit 8ca4b3d
Show file tree
Hide file tree
Showing 54 changed files with 1,905 additions and 547 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.Fuzziness;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
Expand Down Expand Up @@ -89,7 +89,7 @@
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.profile.ProfileResult;
import org.elasticsearch.search.profile.SearchProfileQueryPhaseResult;
import org.elasticsearch.search.profile.SearchProfileShardResult;
import org.elasticsearch.search.profile.aggregation.AggregationProfileShardResult;
import org.elasticsearch.search.profile.query.CollectorResult;
import org.elasticsearch.search.profile.query.QueryProfileShardResult;
Expand Down Expand Up @@ -499,15 +499,15 @@ public void testSearchRequestProfiling() throws IOException {

SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
// tag::search-request-profiling-get
Map<String, SearchProfileQueryPhaseResult> profilingResults =
Map<String, SearchProfileShardResult> profilingResults =
searchResponse.getProfileResults(); // <1>
for (Map.Entry<String, SearchProfileQueryPhaseResult> profilingResult : profilingResults.entrySet()) { // <2>
for (Map.Entry<String, SearchProfileShardResult> profilingResult : profilingResults.entrySet()) { // <2>
String key = profilingResult.getKey(); // <3>
SearchProfileQueryPhaseResult profileShardResult = profilingResult.getValue(); // <4>
SearchProfileShardResult profileShardResult = profilingResult.getValue(); // <4>
}
// end::search-request-profiling-get

SearchProfileQueryPhaseResult profileShardResult = profilingResults.values().iterator().next();
SearchProfileShardResult profileShardResult = profilingResults.values().iterator().next();
assertNotNull(profileShardResult);

// tag::search-request-profiling-queries
Expand Down
170 changes: 146 additions & 24 deletions docs/reference/search/profile.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ The Profile API gives the user insight into how search requests are executed at
a low level so that the user can understand why certain requests are slow, and
take steps to improve them. Note that the Profile API,
<<profile-limitations, amongst other things>>, doesn't measure network latency,
time spent in the search fetch phase, time spent while the requests spends in
queues or while merging shard responses on the coordinating node.
time spent while the requests spends in queues, or while merging shard
responses on the coordinating node.

The output from the Profile API is *very* verbose, especially for complicated
requests executed across many shards. Pretty-printing the response is
Expand Down Expand Up @@ -163,7 +163,37 @@ The API returns the following result:
]
}
],
"aggregations": []
"aggregations": [],
"fetch": {
"type": "fetch",
"description": "",
"time_in_nanos": 660555,
"breakdown": {
"next_reader": 7292,
"next_reader_count": 1,
"load_stored_fields": 299325,
"load_stored_fields_count": 5
},
"debug": {
"stored_fields": ["_id", "_routing", "_source"]
},
"children": [
{
"type": "FetchSourcePhase",
"description": "",
"time_in_nanos": 20443,
"breakdown": {
"next_reader": 745,
"next_reader_count": 1,
"process": 19698,
"process_count": 5
},
"debug": {
"fast_path": 5
}
}
]
}
}
]
}
Expand Down Expand Up @@ -196,7 +226,8 @@ The overall structure of the profile response is as follows:
"collector": [...] <4>
}
],
"aggregations": [...] <5>
"aggregations": [...], <5>
"fetch": {...} <6>
}
]
}
Expand All @@ -208,15 +239,14 @@ The overall structure of the profile response is as follows:
// TESTRESPONSE[s/"query": \[...\]/"query": $body.$_path/]
// TESTRESPONSE[s/"collector": \[...\]/"collector": $body.$_path/]
// TESTRESPONSE[s/"aggregations": \[...\]/"aggregations": []/]
// TESTRESPONSE[s/"fetch": \{...\}/"fetch": $body.$_path/]
<1> A profile is returned for each shard that participated in the response, and
is identified by a unique ID.
<2> Each profile contains a section which holds details about the query
execution.
<3> Each profile has a single time representing the cumulative rewrite time.
<4> Each profile also contains a section about the Lucene Collectors which run
the search.
<5> Each profile contains a section which holds the details about the
aggregation execution.
<2> Query timings and other debugging information.
<3> The cumulative rewrite time.
<4> Names and invocation timings for each collector.
<5> Aggregation timings, invocation counts, and debug information.
<6> Fetch timing and debug information.

Because a search request may be executed against one or more shards in an index,
and a search may cover one or more indices, the top level element in the profile
Expand Down Expand Up @@ -295,7 +325,7 @@ Using our previous `match` query example, let's analyze the `query` section:
]
--------------------------------------------------
// TESTRESPONSE[s/^/{\n"took": $body.took,\n"timed_out": $body.timed_out,\n"_shards": $body._shards,\n"hits": $body.hits,\n"profile": {\n"shards": [ {\n"id": "$body.$_path",\n"searches": [{\n/]
// TESTRESPONSE[s/]$/],"rewrite_time": $body.$_path, "collector": $body.$_path}], "aggregations": []}]}}/]
// TESTRESPONSE[s/]$/],"rewrite_time": $body.$_path, "collector": $body.$_path}], "aggregations": [], "fetch": $body.$_path}]}}/]
// TESTRESPONSE[s/(?<=[" ])\d+(\.\d+)?/$body.$_path/]
// TESTRESPONSE[s/"breakdown": \{...\}/"breakdown": $body.$_path/]
<1> The breakdown timings are omitted for simplicity.
Expand Down Expand Up @@ -347,7 +377,7 @@ Lucene execution:
}
--------------------------------------------------
// TESTRESPONSE[s/^/{\n"took": $body.took,\n"timed_out": $body.timed_out,\n"_shards": $body._shards,\n"hits": $body.hits,\n"profile": {\n"shards": [ {\n"id": "$body.$_path",\n"searches": [{\n"query": [{\n"type": "BooleanQuery",\n"description": "message:get message:search",\n"time_in_nanos": $body.$_path,/]
// TESTRESPONSE[s/}$/},\n"children": $body.$_path}],\n"rewrite_time": $body.$_path, "collector": $body.$_path}], "aggregations": []}]}}/]
// TESTRESPONSE[s/}$/},\n"children": $body.$_path}],\n"rewrite_time": $body.$_path, "collector": $body.$_path}], "aggregations": [], "fetch": $body.$_path}]}}/]
// TESTRESPONSE[s/(?<=[" ])\d+(\.\d+)?/$body.$_path/]

Timings are listed in wall-clock nanoseconds and are not normalized at all. All
Expand Down Expand Up @@ -448,7 +478,7 @@ Looking at the previous example:
]
--------------------------------------------------
// TESTRESPONSE[s/^/{\n"took": $body.took,\n"timed_out": $body.timed_out,\n"_shards": $body._shards,\n"hits": $body.hits,\n"profile": {\n"shards": [ {\n"id": "$body.$_path",\n"searches": [{\n"query": $body.$_path,\n"rewrite_time": $body.$_path,/]
// TESTRESPONSE[s/]$/]}], "aggregations": []}]}}/]
// TESTRESPONSE[s/]$/]}], "aggregations": [], "fetch": $body.$_path}]}}/]
// TESTRESPONSE[s/(?<=[" ])\d+(\.\d+)?/$body.$_path/]


Expand Down Expand Up @@ -569,7 +599,7 @@ GET /my-index-000001/_search
}
--------------------------------------------------
// TEST[setup:my_index]
// TEST[s/_search/_search\?filter_path=profile.shards.id,profile.shards.searches,profile.shards.aggregations/]
// TEST[s/_search/_search\?filter_path=profile.shards.id,profile.shards.searches,profile.shards.aggregations,profile.shards.fetch/]


This example has:
Expand Down Expand Up @@ -673,13 +703,15 @@ The API returns the following result:
]
}
],
"aggregations": [...] <1>
"aggregations": [...], <1>
"fetch": {...}
}
]
}
}
--------------------------------------------------
// TESTRESPONSE[s/"aggregations": \[\.\.\.\]/"aggregations": $body.$_path/]
// TESTRESPONSE[s/"fetch": \{\.\.\.\}/"fetch": $body.$_path/]
// TESTRESPONSE[s/\.\.\.//]
// TESTRESPONSE[s/(?<=[" ])\d+(\.\d+)?/$body.$_path/]
// TESTRESPONSE[s/"id": "\[P6-vulHtQRWuD4YnubWb7A\]\[my-index-000001\]\[0\]"/"id": $body.profile.shards.0.id/]
Expand Down Expand Up @@ -918,6 +950,99 @@ to give you a feel for A) what machinery in {es} is actually eating time, and B)
the magnitude of differences in times between the various components. Like the
overall time, the breakdown is inclusive of all children times.

[[profiling-fetch]]
===== Profiling Fetch


All shards the fetched documents will have a `fetch` section in the profile.
Let's execute a small search and have a look a the fetch profile:

[source,console]
----
GET /my-index-000001/_search?filter_path=profile.shards.fetch
{
"profile": true,
"query": {
"term": {
"user.id": {
"value": "elkbee"
}
}
}
}
----
// TEST[continued]

And here is the fetch profile:

[source,console-result]
----
{
"profile": {
"shards": [
{
"fetch": {
"type": "fetch",
"description": "",
"time_in_nanos": 660555,
"breakdown": {
"next_reader": 7292,
"next_reader_count": 1,
"load_stored_fields": 299325,
"load_stored_fields_count": 5
},
"debug": {
"stored_fields": ["_id", "_routing", "_source"]
},
"children": [
{
"type": "FetchSourcePhase",
"description": "",
"time_in_nanos": 20443,
"breakdown": {
"next_reader": 745,
"next_reader_count": 1,
"process": 19698,
"process_count": 5
},
"debug": {
"fast_path": 4
}
}
]
}
}
]
}
}
----
// TESTRESPONSE[s/(?<=[" ])\d+(\.\d+)?/$body.$_path/]

Since this is debugging information about the way that Elasticsearch executes
the fetch it can change from request to request and version to version. Even
patch versions may change the output here. That lack of consistency is what
makes it useful for debugging.

Anyway! `time_in_nanos` measures the time total time of the fetch phase.
The `breakdown` counts and times the our
per-link:{glossary}/terms.html#glossary-segment[segment] preparation in
`next_reader` and the time taken loading stored fields in `load_stored_fields`.
Debug contains miscellaneous non-timing information, specifically
`stored_fields` lists the stored fields that fetch will have to load. If it is
an empty list then fetch will entirely skip loading stored fields.

The `children` section lists the sub-phases that do the actual fetching work
and the `breakdown` has counts and timings for the
per-link:{glossary}/terms.html#glossary-segment[segment] preparation in
`next_reader` and the per document fetching in `process`.

NOTE: We try hard to load all of the stored fields that we will need for the
fetch up front. This tends to make the `_source` phase a couple of microseconds
per hit. In that case the true cost of `_source` phase is hidden in the
`load_stored_fields` component of the breakdown. It's possible to entirely skip
loading stored fields by setting
`"_source": false, "stored_fields": ["_none_"]`.

[[profiling-considerations]]
===== Profiling Considerations

Expand All @@ -936,16 +1061,13 @@ have a drastic effect compared to other components in the profiled query.
[[profile-limitations]]
===== Limitations

- Profiling currently does not measure the search fetch phase nor the network
overhead.
- Profiling currently does not measure the network overhead.
- Profiling also does not account for time spent in the queue, merging shard
responses on the coordinating node, or additional work such as building global
ordinals (an internal data structure used to speed up search).
- Profiling statistics are currently not available for suggestions,
- Profiling statistics are currently not available for suggestions,
highlighting, `dfs_query_then_fetch`.
- Profiling of the reduce phase of aggregation is currently not available.
- The Profiler is still highly experimental. The Profiler is instrumenting parts
of Lucene that were never designed to be exposed in this manner, and so all
results should be viewed as a best effort to provide detailed diagnostics. We
hope to improve this over time. If you find obviously wrong numbers, strange
query structures, or other bugs, please report them!
- The Profiler is instrumenting internals that can change from version to
version. The resulting json should be considered mostly unstable, especially
things in the `debug` section.
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,39 @@ teardown:
query:
match_all: {}
inner_hits: {}

---
profile fetch:
- skip:
version: ' - 7.99.99'
reason: fetch profiling implemented in 8.0.0 to be backported to 7.16.0

- do:
search:
index: test
body:
profile: true
query:
has_parent:
parent_type: question
query:
match_all: {}
inner_hits: {}

- gt: { profile.shards.0.fetch.time_in_nanos: 0 }
- gt: { profile.shards.0.fetch.breakdown.next_reader_count: 0 }
- gt: { profile.shards.0.fetch.breakdown.next_reader: 0 }
- gt: { profile.shards.0.fetch.breakdown.load_stored_fields_count: 0 }
- gt: { profile.shards.0.fetch.breakdown.load_stored_fields: 0 }
- match: { profile.shards.0.fetch.debug.stored_fields: [_id, _routing, _source] }
- length: { profile.shards.0.fetch.children: 2 }
- match: { profile.shards.0.fetch.children.0.type: FetchSourcePhase }
- gt: { profile.shards.0.fetch.children.0.breakdown.next_reader_count: 0 }
- gt: { profile.shards.0.fetch.children.0.breakdown.next_reader: 0 }
- gt: { profile.shards.0.fetch.children.0.breakdown.next_reader_count: 0 }
- gt: { profile.shards.0.fetch.children.0.breakdown.next_reader: 0 }
- match: { profile.shards.0.fetch.children.1.type: InnerHitsPhase }
- gt: { profile.shards.0.fetch.children.1.breakdown.next_reader_count: 0 }
- gt: { profile.shards.0.fetch.children.1.breakdown.next_reader: 0 }
- gt: { profile.shards.0.fetch.children.1.breakdown.next_reader_count: 0 }
- gt: { profile.shards.0.fetch.children.1.breakdown.next_reader: 0 }
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,13 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static java.util.stream.Collectors.toList;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.not;

/**
* This test class executes twice, first against the remote cluster, and then against another cluster that has the remote cluster
Expand Down Expand Up @@ -405,6 +408,10 @@ public void testProfile() throws Exception {
duelSearch(searchRequest, response -> {
assertHits(response);
assertFalse(response.getProfileResults().isEmpty());
assertThat(
response.getProfileResults().values().stream().filter(sr -> sr.getFetchPhase() != null).collect(toList()),
not(empty())
);
});
}

Expand Down Expand Up @@ -813,6 +820,14 @@ private static Map<String, Object> responseToMap(SearchResponse response) throws
List<Map<String, Object>> shards = (List <Map<String, Object>>)profile.get("shards");
for (Map<String, Object> shard : shards) {
replaceProfileTime(shard);
/*
* The way we try to reduce round trips is by fetching all
* of the results we could possibly need from the remote
* cluster and then merging *those* together locally. This
* will end up fetching more documents total. So we can't
* really compare the fetch profiles here.
*/
shard.remove("fetch");
}
}
return responseMap;
Expand Down
Loading

0 comments on commit 8ca4b3d

Please sign in to comment.