Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Optimize source extraction for categorize_text aggregation #79099

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ XContentParser createParser(NamedXContentRegistry xContentRegistry, DeprecationH
XContentParser createParser(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, byte[] data, int offset, int length) throws IOException;

XContentParser createParser(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, byte[] data, int offset, int length, FilterPath[] includes,
FilterPath[] excludes) throws IOException;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I was last here I thought "we really should do something about all these overrides. That's still true. But I don't want to block this change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There do seem to be a ton of unnecessary overrides that are just there to allow other overrides. I didn't want to tackle that :D

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I'd keep them for now. I'll still grumble about them. But i'll have a think about if we can do something.

/**
* Creates a parser over the provided reader.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,26 @@ public XContentParser createParser(NamedXContentRegistry xContentRegistry,
return createParserForCompatibility(xContentRegistry, deprecationHandler, data, offset, length, RestApiVersion.current());
}

@Override
public XContentParser createParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
byte[] data,
int offset,
int length,
FilterPath[] includes,
FilterPath[] excludes
) throws IOException {
return new CborXContentParser(
xContentRegistry,
deprecationHandler,
cborFactory.createParser(new ByteArrayInputStream(data, offset, length)),
RestApiVersion.current(),
includes,
excludes
);
}

@Override
public XContentParser createParser(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, Reader reader) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,26 @@ public XContentParser createParser(NamedXContentRegistry xContentRegistry,
return createParserForCompatibility(xContentRegistry, deprecationHandler, data, offset, length, RestApiVersion.current());
}

@Override
public XContentParser createParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
byte[] data,
int offset,
int length,
FilterPath[] includes,
FilterPath[] excludes
) throws IOException {
return new JsonXContentParser(
xContentRegistry,
deprecationHandler,
jsonFactory.createParser(new ByteArrayInputStream(data, offset, length)),
RestApiVersion.current(),
includes,
excludes
);
}

@Override
public XContentParser createParser(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, Reader reader) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,26 @@ public XContentParser createParser(NamedXContentRegistry xContentRegistry,
return createParserForCompatibility(xContentRegistry, deprecationHandler, data, offset, length, RestApiVersion.current());
}

@Override
public XContentParser createParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
byte[] data,
int offset,
int length,
FilterPath[] includes,
FilterPath[] excludes
) throws IOException {
return new SmileXContentParser(
xContentRegistry,
deprecationHandler,
smileFactory.createParser(new ByteArrayInputStream(data, offset, length)),
RestApiVersion.current(),
includes,
excludes
);
}

@Override
public XContentParser createParser(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, Reader reader) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,26 @@ public XContentParser createParser(NamedXContentRegistry xContentRegistry,
return createParserForCompatibility(xContentRegistry, deprecationHandler, data, offset, length, RestApiVersion.current());
}

@Override
public XContentParser createParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
byte[] data,
int offset,
int length,
FilterPath[] includes,
FilterPath[] excludes
) throws IOException {
return new YamlXContentParser(
xContentRegistry,
deprecationHandler,
yamlFactory.createParser(new ByteArrayInputStream(data, offset, length)),
RestApiVersion.current(),
includes,
excludes
);
}

@Override
public XContentParser createParser(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, Reader reader) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
Expand All @@ -27,6 +28,7 @@
import org.elasticsearch.xcontent.XContentParseException;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.support.filtering.FilterPath;

import java.io.BufferedInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -101,6 +103,14 @@ public static Tuple<XContentType, Map<String, Object>> convertToMap(BytesReferen
return convertToMap(bytes, ordered, null);
}

/**
* Exactly the same as {@link XContentHelper#convertToMap(BytesReference, boolean, XContentType, FilterPath[], FilterPath[])} but
* none of the fields are filtered
*/
public static Tuple<XContentType, Map<String, Object>> convertToMap(BytesReference bytes, boolean ordered, XContentType xContentType) {
return convertToMap(bytes, ordered, xContentType, null, null);
}

/**
* Converts the given bytes into a map that is optionally ordered. The provided {@link XContentType} must be non-null.
* <p>
Expand All @@ -110,8 +120,13 @@ public static Tuple<XContentType, Map<String, Object>> convertToMap(BytesReferen
* frequently when folks write nanosecond precision dates as a decimal
* number.
*/
public static Tuple<XContentType, Map<String, Object>> convertToMap(BytesReference bytes, boolean ordered, XContentType xContentType)
throws ElasticsearchParseException {
public static Tuple<XContentType, Map<String, Object>> convertToMap(
BytesReference bytes,
boolean ordered,
XContentType xContentType,
@Nullable FilterPath[] include,
@Nullable FilterPath[] exclude
) throws ElasticsearchParseException {
try {
final XContentType contentType;
InputStream input;
Expand All @@ -129,14 +144,16 @@ public static Tuple<XContentType, Map<String, Object>> convertToMap(BytesReferen
final int length = bytes.length();
contentType = xContentType != null ? xContentType : XContentFactory.xContentType(raw, offset, length);
return new Tuple<>(Objects.requireNonNull(contentType),
convertToMap(XContentFactory.xContent(contentType), raw, offset, length, ordered));
convertToMap(XContentFactory.xContent(contentType), raw, offset, length, ordered, include, exclude));
} else {
input = bytes.streamInput();
contentType = xContentType != null ? xContentType : XContentFactory.xContentType(input);
}
try (InputStream stream = input) {
return new Tuple<>(Objects.requireNonNull(contentType),
convertToMap(XContentFactory.xContent(contentType), stream, ordered));
return new Tuple<>(
Objects.requireNonNull(contentType),
convertToMap(XContentFactory.xContent(contentType), stream, ordered, include, exclude)
);
}
} catch (IOException e) {
throw new ElasticsearchParseException("Failed to parse content to map", e);
Expand All @@ -158,14 +175,35 @@ public static Map<String, Object> convertToMap(XContent xContent, String string,
}

/**
* Convert a string in some {@link XContent} format to a {@link Map}. Throws an {@link ElasticsearchParseException} if there is any
* error. Note that unlike {@link #convertToMap(BytesReference, boolean)}, this doesn't automatically uncompress the input.
* The same as {@link XContentHelper#convertToMap(XContent, byte[], int, int, boolean, FilterPath[], FilterPath[])} but none of the
* fields are filtered.
*/
public static Map<String, Object> convertToMap(XContent xContent, InputStream input, boolean ordered)
throws ElasticsearchParseException {
return convertToMap(xContent, input, ordered, null, null);
}

/**
* Convert a string in some {@link XContent} format to a {@link Map}. Throws an {@link ElasticsearchParseException} if there is any
* error. Note that unlike {@link #convertToMap(BytesReference, boolean)}, this doesn't automatically uncompress the input.
*
* Additionally, fields may be included or excluded from the parsing.
*/
public static Map<String, Object> convertToMap(
XContent xContent,
InputStream input,
boolean ordered,
@Nullable FilterPath[] include,
@Nullable FilterPath[] exclude
) throws ElasticsearchParseException {
// It is safe to use EMPTY here because this never uses namedObject
try (XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, input)) {
try (XContentParser parser = xContent.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
input,
include,
exclude
)) {
return ordered ? parser.mapOrdered() : parser.map();
} catch (IOException e) {
throw new ElasticsearchParseException("Failed to parse content to map", e);
Expand All @@ -178,9 +216,35 @@ public static Map<String, Object> convertToMap(XContent xContent, InputStream in
*/
public static Map<String, Object> convertToMap(XContent xContent, byte[] bytes, int offset, int length, boolean ordered)
throws ElasticsearchParseException {
return convertToMap(xContent, bytes, offset, length, ordered, null, null);
}

/**
* Convert a byte array in some {@link XContent} format to a {@link Map}. Throws an {@link ElasticsearchParseException} if there is any
* error. Note that unlike {@link #convertToMap(BytesReference, boolean)}, this doesn't automatically uncompress the input.
*
* Unlike {@link XContentHelper#convertToMap(XContent, byte[], int, int, boolean)} this optionally accepts fields to include or exclude
* during XContent parsing.
*/
public static Map<String, Object> convertToMap(
XContent xContent,
byte[] bytes,
int offset,
int length,
boolean ordered,
@Nullable FilterPath[] include,
@Nullable FilterPath[] exclude
) throws ElasticsearchParseException {
// It is safe to use EMPTY here because this never uses namedObject
try (XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, bytes, offset, length)) {
try (XContentParser parser = xContent.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
bytes,
offset,
length,
include,
exclude)
) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow this a lot. There has to be a better way..... I'm sure there is, but we're in a local minima of effort here. This is a fine change, but I think I need to add some refactoring here to my personal backlog.

return ordered ? parser.mapOrdered() : parser.map();
} catch (IOException e) {
throw new ElasticsearchParseException("Failed to parse content to map", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.xcontent.support.filtering.FilterPath;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -157,6 +158,44 @@ public List<Object> extractRawValues(String path) {
return XContentMapValues.extractRawValues(path, source());
}

/**
* Returns the values associated with the path. Those are "low" level values, and it can
* handle path expression where an array/list is navigated within.
*
* The major difference with {@link SourceLookup#extractRawValues(String)} is that this version will:
*
* - not cache source if it's not already parsed
* - will only extract the desired values from the compressed source instead of deserializing the whole object

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deserializing the whole object

If I read the source correctly it still does, it just filters them early, still the whole object is parsed. This save cycles which is great, however I think this documentation claim is not correct

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does pull the source object into memory (the compressed bytes), but it only parses out the specific included fields.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - the _source is still decompressed but we can skip good chunks of the json parsing. The biggest savings so far as I can tell is in not allocating memory to hold the parsed results. There are smaller advantages in that we can skip some cycles around parsing too, but the copying seems like the biggest thing to me.

*
* This is useful when the caller only wants a single value from source and does not care of source is fully parsed and cached
* for later use.
* @param path The path from which to extract the values from source
* @return The list of found values or an empty list if none are found
*/
public List<Object> extractRawValuesWithoutCaching(String path) {
if (source != null) {
return XContentMapValues.extractRawValues(path, source);
}
FilterPath[] filterPaths = FilterPath.compile(Set.of(path));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are fast to compile now but I think they'll get slower. Right now they just sort of shift the lists around and do some string manipulation.

I wonder if in the future we should make a

SourcePathLookup lookup(String path)

or something. That'd be a neat. It can wait.

if (sourceAsBytes != null) {
return XContentMapValues.extractRawValues(
path,
XContentHelper.convertToMap(sourceAsBytes, false, null, filterPaths, null).v2()
);
}
try {
FieldsVisitor sourceFieldVisitor = new FieldsVisitor(true);
fieldReader.accept(docId, sourceFieldVisitor);
BytesReference source = sourceFieldVisitor.source();
return XContentMapValues.extractRawValues(
path,
XContentHelper.convertToMap(source, false, null, filterPaths, null).v2()
);
} catch (Exception e) {
throw new ElasticsearchParseException("failed to parse / load source", e);
}
}

/**
* For the provided path, return its value in the source.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void collect(int doc, long owningBucketOrd) throws IOException {

private void collectFromSource(int doc, long owningBucketOrd, CategorizationTokenTree categorizer) throws IOException {
sourceLookup.setSegmentAndDocument(ctx, doc);
Iterator<String> itr = sourceLookup.extractRawValues(sourceFieldName).stream().map(obj -> {
Iterator<String> itr = sourceLookup.extractRawValuesWithoutCaching(sourceFieldName).stream().map(obj -> {
if (obj == null) {
return null;
}
Expand Down