Skip to content

Commit

Permalink
[ML] optimize source extraction for categorize_text aggregation (#79099)
Browse files Browse the repository at this point in the history
This optimizes the text value extraction from source in categorize_text aggregation.

Early measurements indicate that the bulk of the time spent in this aggregation is inflating and deserializing the source. We can optimize this a bit (for larger sources) by only extracting the text field we care about.

The main downside here is if there is a sub-agg that requires the source, the that agg will need to extract the entire source again. This should be a rare case.

NOTE: opening as draft as measurements need to be done on some realistic data to see if this actually saves us time.

This takes advantage of the work done here: #77154
  • Loading branch information
benwtrent authored Oct 21, 2021
1 parent e288a1a commit 4e8ed09
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ XContentParser createParser(NamedXContentRegistry xContentRegistry, DeprecationH
XContentParser createParser(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, byte[] data, int offset, int length) throws IOException;

XContentParser createParser(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, byte[] data, int offset, int length, FilterPath[] includes,
FilterPath[] excludes) throws IOException;

/**
* Creates a parser over the provided reader.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,26 @@ public XContentParser createParser(NamedXContentRegistry xContentRegistry,
return createParserForCompatibility(xContentRegistry, deprecationHandler, data, offset, length, RestApiVersion.current());
}

@Override
public XContentParser createParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
byte[] data,
int offset,
int length,
FilterPath[] includes,
FilterPath[] excludes
) throws IOException {
return new CborXContentParser(
xContentRegistry,
deprecationHandler,
cborFactory.createParser(new ByteArrayInputStream(data, offset, length)),
RestApiVersion.current(),
includes,
excludes
);
}

@Override
public XContentParser createParser(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, Reader reader) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,26 @@ public XContentParser createParser(NamedXContentRegistry xContentRegistry,
return createParserForCompatibility(xContentRegistry, deprecationHandler, data, offset, length, RestApiVersion.current());
}

@Override
public XContentParser createParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
byte[] data,
int offset,
int length,
FilterPath[] includes,
FilterPath[] excludes
) throws IOException {
return new JsonXContentParser(
xContentRegistry,
deprecationHandler,
jsonFactory.createParser(new ByteArrayInputStream(data, offset, length)),
RestApiVersion.current(),
includes,
excludes
);
}

@Override
public XContentParser createParser(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, Reader reader) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,26 @@ public XContentParser createParser(NamedXContentRegistry xContentRegistry,
return createParserForCompatibility(xContentRegistry, deprecationHandler, data, offset, length, RestApiVersion.current());
}

@Override
public XContentParser createParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
byte[] data,
int offset,
int length,
FilterPath[] includes,
FilterPath[] excludes
) throws IOException {
return new SmileXContentParser(
xContentRegistry,
deprecationHandler,
smileFactory.createParser(new ByteArrayInputStream(data, offset, length)),
RestApiVersion.current(),
includes,
excludes
);
}

@Override
public XContentParser createParser(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, Reader reader) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,26 @@ public XContentParser createParser(NamedXContentRegistry xContentRegistry,
return createParserForCompatibility(xContentRegistry, deprecationHandler, data, offset, length, RestApiVersion.current());
}

@Override
public XContentParser createParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
byte[] data,
int offset,
int length,
FilterPath[] includes,
FilterPath[] excludes
) throws IOException {
return new YamlXContentParser(
xContentRegistry,
deprecationHandler,
yamlFactory.createParser(new ByteArrayInputStream(data, offset, length)),
RestApiVersion.current(),
includes,
excludes
);
}

@Override
public XContentParser createParser(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, Reader reader) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
Expand All @@ -27,6 +28,7 @@
import org.elasticsearch.xcontent.XContentParseException;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.support.filtering.FilterPath;

import java.io.BufferedInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -101,6 +103,14 @@ public static Tuple<XContentType, Map<String, Object>> convertToMap(BytesReferen
return convertToMap(bytes, ordered, null);
}

/**
* Exactly the same as {@link XContentHelper#convertToMap(BytesReference, boolean, XContentType, FilterPath[], FilterPath[])} but
* none of the fields are filtered
*/
public static Tuple<XContentType, Map<String, Object>> convertToMap(BytesReference bytes, boolean ordered, XContentType xContentType) {
return convertToMap(bytes, ordered, xContentType, null, null);
}

/**
* Converts the given bytes into a map that is optionally ordered. The provided {@link XContentType} must be non-null.
* <p>
Expand All @@ -110,8 +120,13 @@ public static Tuple<XContentType, Map<String, Object>> convertToMap(BytesReferen
* frequently when folks write nanosecond precision dates as a decimal
* number.
*/
public static Tuple<XContentType, Map<String, Object>> convertToMap(BytesReference bytes, boolean ordered, XContentType xContentType)
throws ElasticsearchParseException {
public static Tuple<XContentType, Map<String, Object>> convertToMap(
BytesReference bytes,
boolean ordered,
XContentType xContentType,
@Nullable FilterPath[] include,
@Nullable FilterPath[] exclude
) throws ElasticsearchParseException {
try {
final XContentType contentType;
InputStream input;
Expand All @@ -129,14 +144,16 @@ public static Tuple<XContentType, Map<String, Object>> convertToMap(BytesReferen
final int length = bytes.length();
contentType = xContentType != null ? xContentType : XContentFactory.xContentType(raw, offset, length);
return new Tuple<>(Objects.requireNonNull(contentType),
convertToMap(XContentFactory.xContent(contentType), raw, offset, length, ordered));
convertToMap(XContentFactory.xContent(contentType), raw, offset, length, ordered, include, exclude));
} else {
input = bytes.streamInput();
contentType = xContentType != null ? xContentType : XContentFactory.xContentType(input);
}
try (InputStream stream = input) {
return new Tuple<>(Objects.requireNonNull(contentType),
convertToMap(XContentFactory.xContent(contentType), stream, ordered));
return new Tuple<>(
Objects.requireNonNull(contentType),
convertToMap(XContentFactory.xContent(contentType), stream, ordered, include, exclude)
);
}
} catch (IOException e) {
throw new ElasticsearchParseException("Failed to parse content to map", e);
Expand All @@ -158,14 +175,35 @@ public static Map<String, Object> convertToMap(XContent xContent, String string,
}

/**
* Convert a string in some {@link XContent} format to a {@link Map}. Throws an {@link ElasticsearchParseException} if there is any
* error. Note that unlike {@link #convertToMap(BytesReference, boolean)}, this doesn't automatically uncompress the input.
* The same as {@link XContentHelper#convertToMap(XContent, byte[], int, int, boolean, FilterPath[], FilterPath[])} but none of the
* fields are filtered.
*/
public static Map<String, Object> convertToMap(XContent xContent, InputStream input, boolean ordered)
throws ElasticsearchParseException {
return convertToMap(xContent, input, ordered, null, null);
}

/**
* Convert a string in some {@link XContent} format to a {@link Map}. Throws an {@link ElasticsearchParseException} if there is any
* error. Note that unlike {@link #convertToMap(BytesReference, boolean)}, this doesn't automatically uncompress the input.
*
* Additionally, fields may be included or excluded from the parsing.
*/
public static Map<String, Object> convertToMap(
XContent xContent,
InputStream input,
boolean ordered,
@Nullable FilterPath[] include,
@Nullable FilterPath[] exclude
) throws ElasticsearchParseException {
// It is safe to use EMPTY here because this never uses namedObject
try (XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, input)) {
try (XContentParser parser = xContent.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
input,
include,
exclude
)) {
return ordered ? parser.mapOrdered() : parser.map();
} catch (IOException e) {
throw new ElasticsearchParseException("Failed to parse content to map", e);
Expand All @@ -178,9 +216,35 @@ public static Map<String, Object> convertToMap(XContent xContent, InputStream in
*/
public static Map<String, Object> convertToMap(XContent xContent, byte[] bytes, int offset, int length, boolean ordered)
throws ElasticsearchParseException {
return convertToMap(xContent, bytes, offset, length, ordered, null, null);
}

/**
* Convert a byte array in some {@link XContent} format to a {@link Map}. Throws an {@link ElasticsearchParseException} if there is any
* error. Note that unlike {@link #convertToMap(BytesReference, boolean)}, this doesn't automatically uncompress the input.
*
* Unlike {@link XContentHelper#convertToMap(XContent, byte[], int, int, boolean)} this optionally accepts fields to include or exclude
* during XContent parsing.
*/
public static Map<String, Object> convertToMap(
XContent xContent,
byte[] bytes,
int offset,
int length,
boolean ordered,
@Nullable FilterPath[] include,
@Nullable FilterPath[] exclude
) throws ElasticsearchParseException {
// It is safe to use EMPTY here because this never uses namedObject
try (XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, bytes, offset, length)) {
try (XContentParser parser = xContent.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
bytes,
offset,
length,
include,
exclude)
) {
return ordered ? parser.mapOrdered() : parser.map();
} catch (IOException e) {
throw new ElasticsearchParseException("Failed to parse content to map", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.xcontent.support.filtering.FilterPath;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -157,6 +158,44 @@ public List<Object> extractRawValues(String path) {
return XContentMapValues.extractRawValues(path, source());
}

/**
* Returns the values associated with the path. Those are "low" level values, and it can
* handle path expression where an array/list is navigated within.
*
* The major difference with {@link SourceLookup#extractRawValues(String)} is that this version will:
*
* - not cache source if it's not already parsed
* - will only extract the desired values from the compressed source instead of deserializing the whole object
*
* This is useful when the caller only wants a single value from source and does not care of source is fully parsed and cached
* for later use.
* @param path The path from which to extract the values from source
* @return The list of found values or an empty list if none are found
*/
public List<Object> extractRawValuesWithoutCaching(String path) {
if (source != null) {
return XContentMapValues.extractRawValues(path, source);
}
FilterPath[] filterPaths = FilterPath.compile(Set.of(path));
if (sourceAsBytes != null) {
return XContentMapValues.extractRawValues(
path,
XContentHelper.convertToMap(sourceAsBytes, false, null, filterPaths, null).v2()
);
}
try {
FieldsVisitor sourceFieldVisitor = new FieldsVisitor(true);
fieldReader.accept(docId, sourceFieldVisitor);
BytesReference source = sourceFieldVisitor.source();
return XContentMapValues.extractRawValues(
path,
XContentHelper.convertToMap(source, false, null, filterPaths, null).v2()
);
} catch (Exception e) {
throw new ElasticsearchParseException("failed to parse / load source", e);
}
}

/**
* For the provided path, return its value in the source.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void collect(int doc, long owningBucketOrd) throws IOException {

private void collectFromSource(int doc, long owningBucketOrd, CategorizationTokenTree categorizer) throws IOException {
sourceLookup.setSegmentAndDocument(ctx, doc);
Iterator<String> itr = sourceLookup.extractRawValues(sourceFieldName).stream().map(obj -> {
Iterator<String> itr = sourceLookup.extractRawValuesWithoutCaching(sourceFieldName).stream().map(obj -> {
if (obj == null) {
return null;
}
Expand Down

0 comments on commit 4e8ed09

Please sign in to comment.