Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory efficient xcontent filtering #77154

Merged
merged 13 commits into from
Sep 13, 2021
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package org.elasticsearch.benchmark.search.fetch.subphase;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.filtering.FilterPath;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.fetch.subphase.FetchSourcePhase;
import org.elasticsearch.search.lookup.SourceLookup;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Fork(1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Benchmark)
public class FetchSourcePhaseBenchmark {
private BytesReference sourceBytes;
private FetchSourceContext fetchContext;
private Set<String> includesSet;
private Set<String> excludesSet;
private FilterPath[] includesFilters;
private FilterPath[] excludesFilters;

@Param({ "tiny", "short", "one_4k_field", "one_4m_field" })
private String source;
@Param({ "message" })
private String includes;
@Param({ "" })
private String excludes;

@Setup
public void setup() throws IOException {
switch (source) {
case "tiny":
sourceBytes = new BytesArray("{\"message\": \"short\"}");
break;
case "short":
sourceBytes = read300BytesExample();
break;
case "one_4k_field":
sourceBytes = buildBigExample("huge".repeat(1024));
break;
case "one_4m_field":
sourceBytes = buildBigExample("huge".repeat(1024 * 1024));
break;
default:
throw new IllegalArgumentException("Unknown source [" + source + "]");
}
fetchContext = new FetchSourceContext(
true,
Strings.splitStringByCommaToArray(includes),
Strings.splitStringByCommaToArray(excludes)
);
includesSet = Set.of(fetchContext.includes());
excludesSet = Set.of(fetchContext.excludes());
includesFilters = FilterPath.compile(Set.of(fetchContext.includes()));
excludesFilters = FilterPath.compile(Set.of(fetchContext.excludes()));
}

private BytesReference read300BytesExample() throws IOException {
return Streams.readFully(FetchSourcePhaseBenchmark.class.getResourceAsStream("300b_example.json"));
}

private BytesReference buildBigExample(String extraText) throws IOException {
String bigger = read300BytesExample().utf8ToString();
bigger = "{\"huge\": \"" + extraText + "\"," + bigger.substring(1);
return new BytesArray(bigger);
}

@Benchmark
public BytesReference filterObjects() throws IOException {
SourceLookup lookup = new SourceLookup();
lookup.setSource(sourceBytes);
Object value = lookup.filter(fetchContext);
return FetchSourcePhase.objectToBytes(value, XContentType.JSON, Math.min(1024, lookup.internalSourceRef().length()));
}

@Benchmark
public BytesReference filterXContentOnParser() throws IOException {
BytesStreamOutput streamOutput = new BytesStreamOutput(Math.min(1024, sourceBytes.length()));
XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), streamOutput);
try (
XContentParser parser = XContentType.JSON.xContent()
.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
sourceBytes.streamInput(),
includesFilters,
excludesFilters
)
) {
builder.copyCurrentStructure(parser);
return BytesReference.bytes(builder);
}
}

@Benchmark
public BytesReference filterXContentOnBuilder() throws IOException {
BytesStreamOutput streamOutput = new BytesStreamOutput(Math.min(1024, sourceBytes.length()));
XContentBuilder builder = new XContentBuilder(
XContentType.JSON.xContent(),
streamOutput,
includesSet,
excludesSet,
XContentType.JSON.toParsedMediaType()
);
try (
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, sourceBytes.streamInput())
) {
builder.copyCurrentStructure(parser);
return BytesReference.bytes(builder);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"@timestamp": "2099-11-15T14:12:12",
"http": {
"request": {
"method": "get"
},
"response": {
"bytes": 1070000,
"status_code": 200
},
"version": "1.1"
},
"message": "GET /search HTTP/1.1 200 1070000",
"source": {
"ip": "192.168.0.1"
},
"user": {
"id": "user"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.common.xcontent;

import org.elasticsearch.common.xcontent.support.filtering.FilterPath;
import org.elasticsearch.core.RestApiVersion;

import java.io.IOException;
Expand Down Expand Up @@ -58,6 +59,17 @@ XContentParser createParser(NamedXContentRegistry xContentRegistry, DeprecationH
XContentParser createParser(NamedXContentRegistry xContentRegistry, DeprecationHandler deprecationHandler, InputStream is)
throws IOException;

/**
* Creates a parser over the provided input stream.
*/
XContentParser createParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
InputStream is,
FilterPath[] includes,
FilterPath[] excludes
) throws IOException;

/**
* Creates a parser over the provided bytes.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.dataformat.cbor.CBORFactory;
import org.elasticsearch.core.RestApiVersion;

import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContent;
Expand All @@ -21,6 +21,8 @@
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.filtering.FilterPath;
import org.elasticsearch.core.RestApiVersion;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -80,6 +82,24 @@ public XContentParser createParser(NamedXContentRegistry xContentRegistry,
return new CborXContentParser(xContentRegistry, deprecationHandler, cborFactory.createParser(is));
}

@Override
public XContentParser createParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
InputStream is,
FilterPath[] includes,
FilterPath[] excludes
) throws IOException {
return new CborXContentParser(
xContentRegistry,
deprecationHandler,
cborFactory.createParser(is),
RestApiVersion.current(),
includes,
excludes
);
}

@Override
public XContentParser createParser(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, byte[] data) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContentParser;
import org.elasticsearch.common.xcontent.support.filtering.FilterPath;

public class CborXContentParser extends JsonXContentParser {

Expand All @@ -28,6 +29,17 @@ public CborXContentParser(NamedXContentRegistry xContentRegistry,
super(xContentRegistry, deprecationHandler, parser, restApiVersion);
}

public CborXContentParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
JsonParser parser,
RestApiVersion restApiVersion,
FilterPath[] includes,
FilterPath[] excludes
) {
super(xContentRegistry, deprecationHandler, parser, restApiVersion, includes, excludes);
}

@Override
public XContentType contentType() {
return XContentType.CBOR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import org.elasticsearch.core.RestApiVersion;

import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentGenerator;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.filtering.FilterPath;
import org.elasticsearch.core.RestApiVersion;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -81,6 +83,24 @@ public XContentParser createParser(NamedXContentRegistry xContentRegistry,
return new JsonXContentParser(xContentRegistry, deprecationHandler, jsonFactory.createParser(is));
}

@Override
public XContentParser createParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
InputStream is,
FilterPath[] include,
FilterPath[] exclude
) throws IOException {
return new JsonXContentParser(
xContentRegistry,
deprecationHandler,
jsonFactory.createParser(is),
RestApiVersion.current(),
include,
exclude
);
}

@Override
public XContentParser createParser(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, byte[] data) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
import com.fasterxml.jackson.core.JsonLocation;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import org.elasticsearch.core.RestApiVersion;
import com.fasterxml.jackson.core.filter.FilteringParserDelegate;

import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentLocation;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.AbstractXContentParser;
import org.elasticsearch.common.xcontent.support.filtering.FilterPath;
import org.elasticsearch.common.xcontent.support.filtering.FilterPathBasedFilter;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.internal.io.IOUtils;

import java.io.IOException;
Expand All @@ -39,6 +43,25 @@ public JsonXContentParser(NamedXContentRegistry xContentRegistry,
this.parser = parser;
}

public JsonXContentParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
JsonParser parser,
RestApiVersion restApiVersion,
FilterPath[] include,
FilterPath[] exclude
) {
super(xContentRegistry, deprecationHandler, restApiVersion);
JsonParser filtered = parser;
if (exclude != null) {
filtered = new FilteringParserDelegate(filtered, new FilterPathBasedFilter(exclude, false), true, true);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably fail in hasDoubleWildcard. Unless I can figure out a way to make it work!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this test.

}
if (include != null) {
filtered = new FilteringParserDelegate(filtered, new FilterPathBasedFilter(include, true), true, true);
}
this.parser = filtered;
}

@Override
public XContentType contentType() {
return XContentType.JSON;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
import org.elasticsearch.core.RestApiVersion;

import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentGenerator;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.filtering.FilterPath;
import org.elasticsearch.core.RestApiVersion;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -82,6 +84,24 @@ public XContentParser createParser(NamedXContentRegistry xContentRegistry,
return new SmileXContentParser(xContentRegistry, deprecationHandler, smileFactory.createParser(is));
}

@Override
public XContentParser createParser(
NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler,
InputStream is,
FilterPath[] include,
FilterPath[] exclude
) throws IOException {
return new SmileXContentParser(
xContentRegistry,
deprecationHandler,
smileFactory.createParser(is),
RestApiVersion.current(),
include,
exclude
);
}

@Override
public XContentParser createParser(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, byte[] data) throws IOException {
Expand Down
Loading