Skip to content

Commit

Permalink
Reduce memory usage in field-caps responses (#88042)
Browse files Browse the repository at this point in the history
We have reduced the memory usage of field-caps requests targeting many 
indices in 8.2+ (see #83494). Unfortunately, we still receive OOM
reports in 7.17. I think we should push some contained improvements to
reduce the memory usage for those requests in 7.17. I have looked into
several options. This PR reduces the memory usage of field-caps
responses by replace HashMap with ArrayList for the field responses to
eliminate duplicated string names and internal nodes of Map.
  • Loading branch information
dnhatn authored Jun 29, 2022
1 parent 5e2915c commit 95b23fc
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 50 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/88042.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 88042
summary: Reduce memory usage in field-caps responses
area: Search
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public FieldCapabilitiesIndexResponse fetch(final FieldCapabilitiesIndexRequest
);

if (canMatchShard(request, searchExecutionContext) == false) {
return new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false);
return new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyList(), false);
}

Set<String> fieldNames = new HashSet<>();
Expand Down Expand Up @@ -118,7 +118,7 @@ public FieldCapabilitiesIndexResponse fetch(final FieldCapabilitiesIndexRequest
}
}
}
return new FieldCapabilitiesIndexResponse(request.index(), responseMap, true);
return new FieldCapabilitiesIndexResponse(request.index(), responseMap.values(), true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,29 @@
import org.elasticsearch.common.io.stream.Writeable;

import java.io.IOException;
import java.util.Map;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class FieldCapabilitiesIndexResponse extends ActionResponse implements Writeable {
private final String indexName;
private final Map<String, IndexFieldCapabilities> responseMap;
private final Collection<IndexFieldCapabilities> fields;
private final boolean canMatch;
private final transient Version originVersion;

FieldCapabilitiesIndexResponse(String indexName, Map<String, IndexFieldCapabilities> responseMap, boolean canMatch) {
FieldCapabilitiesIndexResponse(String indexName, Collection<IndexFieldCapabilities> fields, boolean canMatch) {
this.indexName = indexName;
this.responseMap = responseMap;
this.fields = fields;
this.canMatch = canMatch;
this.originVersion = Version.CURRENT;
}

FieldCapabilitiesIndexResponse(StreamInput in) throws IOException {
super(in);
this.indexName = in.readString();
this.responseMap = in.readMap(StreamInput::readString, IndexFieldCapabilities::new);
this.fields = readFields(in);
this.canMatch = in.getVersion().onOrAfter(Version.V_7_9_0) ? in.readBoolean() : true;
this.originVersion = in.getVersion();
}
Expand All @@ -51,18 +54,35 @@ public boolean canMatch() {
}

/**
* Get the field capabilities map
* Get the field capabilities
*/
public Map<String, IndexFieldCapabilities> get() {
return responseMap;
public Collection<IndexFieldCapabilities> getFields() {
return fields;
}

/**
*
* Get the field capabilities for the provided {@code field}
*/
public IndexFieldCapabilities getField(String field) {
return responseMap.get(field);
private static Collection<IndexFieldCapabilities> readFields(StreamInput in) throws IOException {
// Previously, we serialize fields as a map from field name to field-caps
final int size = in.readVInt();
if (size == 0) {
return Collections.emptyList();
}
final List<IndexFieldCapabilities> fields = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
final String fieldName = in.readString(); // the fieldName will be discarded - it's used in assertions only
final IndexFieldCapabilities fieldCaps = new IndexFieldCapabilities(in);
assert fieldName.equals(fieldCaps.getName()) : fieldName + " != " + fieldCaps.getName();
fields.add(fieldCaps);
}
return fields;
}

private static void writeFields(StreamOutput out, Collection<IndexFieldCapabilities> fields) throws IOException {
// Previously, we serialize fields as a map from field name to field-caps
out.writeVInt(fields.size());
for (IndexFieldCapabilities field : fields) {
out.writeString(field.getName());
field.writeTo(out);
}
}

Version getOriginVersion() {
Expand All @@ -72,7 +92,7 @@ Version getOriginVersion() {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(indexName);
out.writeMap(responseMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
writeFields(out, fields);
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
out.writeBoolean(canMatch);
}
Expand All @@ -83,11 +103,11 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FieldCapabilitiesIndexResponse that = (FieldCapabilitiesIndexResponse) o;
return canMatch == that.canMatch && Objects.equals(indexName, that.indexName) && Objects.equals(responseMap, that.responseMap);
return canMatch == that.canMatch && Objects.equals(indexName, that.indexName) && Objects.equals(fields, that.fields);
}

@Override
public int hashCode() {
return Objects.hash(indexName, responseMap, canMatch);
return Objects.hash(indexName, fields, canMatch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
remoteClusterClient.fieldCaps(remoteRequest, ActionListener.wrap(response -> {
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
String indexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName());
indexResponses.putIfAbsent(indexName, new FieldCapabilitiesIndexResponse(indexName, resp.get(), resp.canMatch()));
indexResponses.putIfAbsent(indexName, new FieldCapabilitiesIndexResponse(indexName, resp.getFields(), resp.canMatch()));
}
for (FieldCapabilitiesFailure failure : response.getFailures()) {
Exception ex = failure.getException();
Expand Down Expand Up @@ -265,17 +265,16 @@ private void innerMerge(
Map<String, Map<String, FieldCapabilities.Builder>> responseMapBuilder,
FieldCapabilitiesIndexResponse response
) {
for (Map.Entry<String, IndexFieldCapabilities> entry : response.get().entrySet()) {
final String field = entry.getKey();
for (IndexFieldCapabilities fieldCap : response.getFields()) {
final String fieldName = fieldCap.getName();
// best effort to detect metadata field coming from older nodes
final boolean isMetadataField = response.getOriginVersion().onOrAfter(Version.V_7_13_0)
? entry.getValue().isMetadatafield()
: metadataFieldPred.test(field);
final IndexFieldCapabilities fieldCap = entry.getValue();
Map<String, FieldCapabilities.Builder> typeMap = responseMapBuilder.computeIfAbsent(field, f -> new HashMap<>());
? fieldCap.isMetadatafield()
: metadataFieldPred.test(fieldName);
Map<String, FieldCapabilities.Builder> typeMap = responseMapBuilder.computeIfAbsent(fieldName, f -> new HashMap<>());
FieldCapabilities.Builder builder = typeMap.computeIfAbsent(
fieldCap.getType(),
key -> new FieldCapabilities.Builder(field, key)
key -> new FieldCapabilities.Builder(fieldName, key)
);
builder.add(response.getIndexName(), isMetadataField, fieldCap.isSearchable(), fieldCap.isAggregatable(), fieldCap.meta());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.fieldcaps;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;

import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;

import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength;
import static org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponseTests.randomFieldCaps;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;

public class FieldCapabilitiesIndexResponseTests extends AbstractWireSerializingTestCase<FieldCapabilitiesIndexResponse> {
@Override
protected Writeable.Reader<FieldCapabilitiesIndexResponse> instanceReader() {
return FieldCapabilitiesIndexResponse::new;
}

static FieldCapabilitiesIndexResponse randomIndexResponse() {
return randomIndexResponse(randomAsciiLettersOfLength(10), randomBoolean());
}

static FieldCapabilitiesIndexResponse randomIndexResponse(String index, boolean canMatch) {
List<IndexFieldCapabilities> fields = new ArrayList<>();
if (canMatch) {
String[] fieldNames = generateRandomStringArray(5, 10, false, true);
assertNotNull(fieldNames);
for (String fieldName : fieldNames) {
fields.add(randomFieldCaps(fieldName));
}
}
return new FieldCapabilitiesIndexResponse(index, fields, canMatch);
}

@Override
protected FieldCapabilitiesIndexResponse createTestInstance() {
return randomIndexResponse();
}

public void testDeserializeFromBase64() throws Exception {
String base64 = "CWxvZ3MtMTAwMQMGcGVyaW9kBnBlcmlvZARsb25nAQABAQR1bml0BnNlY29uZApAdGltZXN0"
+ "YW1wCkB0aW1lc3RhbXAEZGF0ZQEBAAAHbWVzc2FnZQdtZXNzYWdlBHRleHQAAQAAAQAAAAAAAAAAAA==";
StreamInput in = StreamInput.wrap(Base64.getDecoder().decode(base64));
FieldCapabilitiesIndexResponse resp = new FieldCapabilitiesIndexResponse(in);
assertTrue(resp.canMatch());
assertThat(resp.getIndexName(), equalTo("logs-1001"));
assertThat(
resp.getFields(),
containsInAnyOrder(
new IndexFieldCapabilities("@timestamp", "date", true, true, false, Collections.emptyMap()),
new IndexFieldCapabilities("message", "text", false, true, false, Collections.emptyMap()),
new IndexFieldCapabilities("period", "long", true, false, true, Collections.singletonMap("unit", "second"))
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
import java.util.List;
import java.util.Set;

import static org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponseTests.randomIndexResponse;

public class FieldCapabilitiesNodeResponseTests extends AbstractWireSerializingTestCase<FieldCapabilitiesNodeResponse> {

@Override
protected FieldCapabilitiesNodeResponse createTestInstance() {
List<FieldCapabilitiesIndexResponse> responses = new ArrayList<>();
int numResponse = randomIntBetween(0, 10);
for (int i = 0; i < numResponse; i++) {
responses.add(FieldCapabilitiesResponseTests.createRandomIndexResponse());
responses.add(randomIndexResponse());
}
int numUnmatched = randomIntBetween(0, 3);
Set<ShardId> shardIds = new HashSet<>();
Expand All @@ -46,15 +48,15 @@ protected FieldCapabilitiesNodeResponse mutateInstance(FieldCapabilitiesNodeResp
int mutation = response.getIndexResponses().isEmpty() ? 0 : randomIntBetween(0, 2);
switch (mutation) {
case 0:
newResponses.add(FieldCapabilitiesResponseTests.createRandomIndexResponse());
newResponses.add(randomIndexResponse());
break;
case 1:
int toRemove = randomInt(newResponses.size() - 1);
newResponses.remove(toRemove);
break;
case 2:
int toReplace = randomInt(newResponses.size() - 1);
newResponses.set(toReplace, FieldCapabilitiesResponseTests.createRandomIndexResponse());
newResponses.set(toReplace, randomIndexResponse());
break;
}
return new FieldCapabilitiesNodeResponse(newResponses, Collections.emptyMap(), response.getUnmatchedShardIds());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import java.util.List;
import java.util.Map;

import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength;

public class FieldCapabilitiesResponseTests extends AbstractWireSerializingTestCase<FieldCapabilitiesResponse> {

@Override
Expand All @@ -40,7 +38,7 @@ protected FieldCapabilitiesResponse createTestInstance() {
int numResponse = randomIntBetween(0, 10);

for (int i = 0; i < numResponse; i++) {
responses.add(createRandomIndexResponse());
responses.add(FieldCapabilitiesIndexResponseTests.randomIndexResponse());
}
randomResponse = new FieldCapabilitiesResponse(responses, Collections.emptyList());
return randomResponse;
Expand All @@ -51,22 +49,6 @@ protected Writeable.Reader<FieldCapabilitiesResponse> instanceReader() {
return FieldCapabilitiesResponse::new;
}

public static FieldCapabilitiesIndexResponse createRandomIndexResponse() {
return randomIndexResponse(randomAsciiLettersOfLength(10), randomBoolean());
}

public static FieldCapabilitiesIndexResponse randomIndexResponse(String index, boolean canMatch) {
Map<String, IndexFieldCapabilities> responses = new HashMap<>();

String[] fields = generateRandomStringArray(5, 10, false, true);
assertNotNull(fields);

for (String field : fields) {
responses.put(field, randomFieldCaps(field));
}
return new FieldCapabilitiesIndexResponse(index, responses, canMatch);
}

public static IndexFieldCapabilities randomFieldCaps(String fieldName) {
Map<String, String> meta;
switch (randomInt(2)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponseTests.randomIndexResponse;
import static org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponseTests.randomIndexResponse;
import static org.elasticsearch.action.fieldcaps.RequestDispatcher.GROUP_REQUESTS_VERSION;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.equalTo;
Expand Down

0 comments on commit 95b23fc

Please sign in to comment.