Skip to content

Commit

Permalink
Memory efficient xcontent filtering (backport of elastic#77154)
Browse files Browse the repository at this point in the history
I found myself needing support for something like `filter_path` on
`XContentParser`. It was simple enough to plug it in so I did. Then I
realized that it might offer more memory efficient source filtering
(elastic#25168) so I put together a quick benchmark comparing the source
filtering that we do in `_search`.

Filtering using the parser is about 33% faster than how we filter now
when you select a single field from a 300 byte document:
```
Benchmark                                          (excludes)  (includes)  (source)  Mode  Cnt     Score    Error  Units
FetchSourcePhaseBenchmark.filterObjects                           message     short  avgt    5  2360.342 ±  4.715  ns/op
FetchSourcePhaseBenchmark.filterXContentOnBuilder                 message     short  avgt    5  2010.278 ± 15.042  ns/op
FetchSourcePhaseBenchmark.filterXContentOnParser                  message     short  avgt    5  1588.446 ± 18.593  ns/op
```

The top line is the way we filter now. The middle line is adding a
filter to `XContentBuilder` - something we can do right now without any
of my plumbing work. The bottom line is filtering on the parser,
requiring all the new plumbing.

This isn't particularly impresive. 33% *sounds* great! But 700
nanoseconds per document isn't going to cut into anyone's search times.
If you fetch a thousand docuents that's .7 milliseconds of savings.

But we mostly advise folks to use source filtering on fetch when the
source is large and you only want a small part of it. So I tried when
the source is about 4.3kb and you want a single field:
```
Benchmark                                          (excludes)  (includes)      (source)  Mode  Cnt     Score     Error  Units
FetchSourcePhaseBenchmark.filterObjects                           message  one_4k_field  avgt    5  5957.128 ± 117.402  ns/op
FetchSourcePhaseBenchmark.filterXContentOnBuilder                 message  one_4k_field  avgt    5  4999.073 ±  96.003  ns/op
FetchSourcePhaseBenchmark.filterXContentonParser                  message  one_4k_field  avgt    5  3261.478 ±  48.879  ns/op
```

That's 45% faster. Put another way, 2.7 microseconds a document. Not
bad!

But have a look at how things come out when you want a single field from
a 4 *megabyte* document:
```
Benchmark                                          (excludes)  (includes)      (source)  Mode  Cnt        Score        Error  Units
FetchSourcePhaseBenchmark.filterObjects                           message  one_4m_field  avgt    5  8266343.036 ± 176197.077  ns/op
FetchSourcePhaseBenchmark.filterXContentOnBuilder                 message  one_4m_field  avgt    5  6227560.013 ±  68306.318  ns/op
FetchSourcePhaseBenchmark.filterXContentonParser                  message  one_4m_field  avgt    5  1617153.472 ±  80164.547  ns/op
```

These documents are very large. I've encountered documents like them in
real life, but they've always been the outlier for me. But a 6.5
millisecond per document savings ain't anything to sneeze at.

Take a look at what you get when I turn on gc metrics:
```
FetchSourcePhaseBenchmark.filterObjects                          message  one_4m_field  avgt    5   7036097.561 ±  84721.312   ns/op
FetchSourcePhaseBenchmark.filterObjects:·gc.alloc.rate           message  one_4m_field  avgt    5      2166.613 ±     25.975  MB/sec
FetchSourcePhaseBenchmark.filterXContentOnBuilder                message  one_4m_field  avgt    5   6104595.992 ±  55445.508   ns/op
FetchSourcePhaseBenchmark.filterXContentOnBuilder:·gc.alloc.rate message  one_4m_field  avgt    5      2496.978 ±     22.650  MB/sec
FetchSourcePhaseBenchmark.filterXContentonParser                 message  one_4m_field  avgt    5   1614980.846 ±  31716.956   ns/op
FetchSourcePhaseBenchmark.filterXContentonParser:·gc.alloc.rate  message  one_4m_field  avgt    5         1.755 ±      0.035  MB/sec
```
  • Loading branch information
nik9000 committed Sep 13, 2021
1 parent 4ec48a0 commit ca3df19
Show file tree
Hide file tree
Showing 15 changed files with 393 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package org.elasticsearch.benchmark.search.fetch.subphase;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.filtering.FilterPath;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.fetch.subphase.FetchSourcePhase;
import org.elasticsearch.search.lookup.SourceLookup;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Fork(1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Benchmark)
public class FetchSourcePhaseBenchmark {
private BytesReference sourceBytes;
private FetchSourceContext fetchContext;
private Set<String> includesSet;
private Set<String> excludesSet;
private FilterPath[] includesFilters;
private FilterPath[] excludesFilters;

@Param({ "tiny", "short", "one_4k_field", "one_4m_field" })
private String source;
@Param({ "message" })
private String includes;
@Param({ "" })
private String excludes;

@Setup
public void setup() throws IOException {
switch (source) {
case "tiny":
sourceBytes = new BytesArray("{\"message\": \"short\"}");
break;
case "short":
sourceBytes = read300BytesExample();
break;
case "one_4k_field":
sourceBytes = buildBigExample("huge".repeat(1024));
break;
case "one_4m_field":
sourceBytes = buildBigExample("huge".repeat(1024 * 1024));
break;
default:
throw new IllegalArgumentException("Unknown source [" + source + "]");
}
fetchContext = new FetchSourceContext(
true,
Strings.splitStringByCommaToArray(includes),
Strings.splitStringByCommaToArray(excludes)
);
includesSet = Set.of(fetchContext.includes());
excludesSet = Set.of(fetchContext.excludes());
includesFilters = FilterPath.compile(Set.of(fetchContext.includes()));
excludesFilters = FilterPath.compile(Set.of(fetchContext.excludes()));
}

private BytesReference read300BytesExample() throws IOException {
return Streams.readFully(FetchSourcePhaseBenchmark.class.getResourceAsStream("300b_example.json"));
}

private BytesReference buildBigExample(String extraText) throws IOException {
String bigger = read300BytesExample().utf8ToString();
bigger = "{\"huge\": \"" + extraText + "\"," + bigger.substring(1);
return new BytesArray(bigger);
}

@Benchmark
public BytesReference filterObjects() throws IOException {
SourceLookup lookup = new SourceLookup();
lookup.setSource(sourceBytes);
Object value = lookup.filter(fetchContext);
return FetchSourcePhase.objectToBytes(value, XContentType.JSON, Math.min(1024, lookup.internalSourceRef().length()));
}

@Benchmark
public BytesReference filterXContentOnParser() throws IOException {
BytesStreamOutput streamOutput = new BytesStreamOutput(Math.min(1024, sourceBytes.length()));
XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), streamOutput);
try (
XContentParser parser = XContentType.JSON.xContent()
.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
sourceBytes.streamInput(),
includesFilters,
excludesFilters
)
) {
builder.copyCurrentStructure(parser);
return BytesReference.bytes(builder);
}
}

@Benchmark
public BytesReference filterXContentOnBuilder() throws IOException {
BytesStreamOutput streamOutput = new BytesStreamOutput(Math.min(1024, sourceBytes.length()));
XContentBuilder builder = new XContentBuilder(
XContentType.JSON.xContent(),
streamOutput,
includesSet,
excludesSet,
XContentType.JSON.toParsedMediaType()
);
try (
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, sourceBytes.streamInput())
) {
builder.copyCurrentStructure(parser);
return BytesReference.bytes(builder);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"@timestamp": "2099-11-15T14:12:12",
"http": {
"request": {
"method": "get"
},
"response": {
"bytes": 1070000,
"status_code": 200
},
"version": "1.1"
},
"message": "GET /search HTTP/1.1 200 1070000",
"source": {
"ip": "192.168.0.1"
},
"user": {
"id": "user"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.elasticsearch.common.xcontent;

import org.elasticsearch.common.xcontent.support.filtering.FilterPath;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -56,6 +58,17 @@ XContentParser createParser(NamedXContentRegistry xContentRegistry,
XContentParser createParser(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, InputStream is) throws IOException;

/**
* Creates a parser over the provided input stream.
*/
XContentParser createParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
InputStream is,
FilterPath[] includes,
FilterPath[] excludes
) throws IOException;

/**
* Creates a parser over the provided bytes.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.dataformat.cbor.CBORFactory;

import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContent;
Expand All @@ -20,6 +21,7 @@
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.filtering.FilterPath;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -79,6 +81,23 @@ public XContentParser createParser(NamedXContentRegistry xContentRegistry,
return new CborXContentParser(xContentRegistry, deprecationHandler, cborFactory.createParser(is));
}

@Override
public XContentParser createParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
InputStream is,
FilterPath[] includes,
FilterPath[] excludes
) throws IOException {
return new CborXContentParser(
xContentRegistry,
deprecationHandler,
cborFactory.createParser(is),
includes,
excludes
);
}

@Override
public XContentParser createParser(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, byte[] data) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
package org.elasticsearch.common.xcontent.cbor;

import com.fasterxml.jackson.core.JsonParser;

import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContentParser;
import org.elasticsearch.common.xcontent.support.filtering.FilterPath;

public class CborXContentParser extends JsonXContentParser {

Expand All @@ -21,6 +23,16 @@ public CborXContentParser(NamedXContentRegistry xContentRegistry,
super(xContentRegistry, deprecationHandler, parser);
}

public CborXContentParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
JsonParser parser,
FilterPath[] includes,
FilterPath[] excludes
) {
super(xContentRegistry, deprecationHandler, parser, includes, excludes);
}

@Override
public XContentType contentType() {
return XContentType.CBOR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;

import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentGenerator;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.filtering.FilterPath;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -80,6 +82,23 @@ public XContentParser createParser(NamedXContentRegistry xContentRegistry,
return new JsonXContentParser(xContentRegistry, deprecationHandler, jsonFactory.createParser(is));
}

@Override
public XContentParser createParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
InputStream is,
FilterPath[] include,
FilterPath[] exclude
) throws IOException {
return new JsonXContentParser(
xContentRegistry,
deprecationHandler,
jsonFactory.createParser(is),
include,
exclude
);
}

@Override
public XContentParser createParser(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, byte[] data) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
import com.fasterxml.jackson.core.JsonLocation;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.filter.FilteringParserDelegate;

import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentLocation;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.AbstractXContentParser;
import org.elasticsearch.common.xcontent.support.filtering.FilterPath;
import org.elasticsearch.common.xcontent.support.filtering.FilterPathBasedFilter;
import org.elasticsearch.core.internal.io.IOUtils;

import java.io.IOException;
Expand All @@ -31,6 +35,30 @@ public JsonXContentParser(NamedXContentRegistry xContentRegistry,
this.parser = parser;
}

public JsonXContentParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
JsonParser parser,
FilterPath[] include,
FilterPath[] exclude
) {
super(xContentRegistry, deprecationHandler);
JsonParser filtered = parser;
if (exclude != null) {
for (FilterPath e : exclude) {
if (e.hasDoubleWildcard()) {
// Fixed in Jackson 2.13 - https://github.com/FasterXML/jackson-core/issues/700
throw new UnsupportedOperationException("double wildcards are not supported in filtered excludes");
}
}
filtered = new FilteringParserDelegate(filtered, new FilterPathBasedFilter(exclude, false), true, true);
}
if (include != null) {
filtered = new FilteringParserDelegate(filtered, new FilterPathBasedFilter(include, true), true, true);
}
this.parser = filtered;
}

@Override
public XContentType contentType() {
return XContentType.JSON;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.fasterxml.jackson.dataformat.smile.SmileGenerator;

import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentGenerator;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.filtering.FilterPath;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -81,6 +83,23 @@ public XContentParser createParser(NamedXContentRegistry xContentRegistry,
return new SmileXContentParser(xContentRegistry, deprecationHandler, smileFactory.createParser(is));
}

@Override
public XContentParser createParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
InputStream is,
FilterPath[] include,
FilterPath[] exclude
) throws IOException {
return new SmileXContentParser(
xContentRegistry,
deprecationHandler,
smileFactory.createParser(is),
include,
exclude
);
}

@Override
public XContentParser createParser(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, byte[] data) throws IOException {
Expand Down
Loading

0 comments on commit ca3df19

Please sign in to comment.