From 11254c0f7ac2397435935bca7f43b25d31d1ecf2 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 13 Oct 2021 18:09:58 -0400 Subject: [PATCH] Reduce field caps internal response memory usage and wire size --- .../FieldCapabilitiesIndexResponse.java | 94 +++++++++++++++++++ .../fieldcaps/FieldCapabilitiesResponse.java | 4 +- .../common/io/stream/FilterStreamOutput.java | 44 +++++++++ 3 files changed, 140 insertions(+), 2 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamOutput.java diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexResponse.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexResponse.java index f592b57f76bab..c9b8dc59ea1ac 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexResponse.java @@ -8,13 +8,20 @@ package org.elasticsearch.action.fieldcaps; +import com.carrotsearch.hppc.ObjectIntHashMap; +import com.carrotsearch.hppc.ObjectIntMap; +import com.carrotsearch.hppc.cursors.ObjectIntCursor; + import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.FilterStreamInput; +import org.elasticsearch.common.io.stream.FilterStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -93,4 +100,91 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(indexName, responseMap, canMatch); } + + private static void addStringToDict(ObjectIntMap dictionary, String s) { + dictionary.putOrAdd(s, dictionary.size(), 0); + } + + private static ObjectIntMap collectStringLiterals(List responses) { + final ObjectIntMap dict = new ObjectIntHashMap<>(); + for (FieldCapabilitiesIndexResponse response : responses) { + addStringToDict(dict, response.getIndexName()); + for (Map.Entry fieldEntry : response.responseMap.entrySet()) { + addStringToDict(dict, fieldEntry.getKey()); + final IndexFieldCapabilities fieldCap = fieldEntry.getValue(); + addStringToDict(dict, fieldCap.getName()); + addStringToDict(dict, fieldCap.getType()); + for (Map.Entry e : fieldCap.meta().entrySet()) { + addStringToDict(dict, e.getKey()); + addStringToDict(dict, e.getValue()); + } + } + } + return dict; + } + + static void writeResponses(StreamOutput output, List responses) throws IOException { + if (output.getVersion().onOrAfter(Version.V_8_0_0)) { + final boolean withOrdinals = responses.size() > 1; + output.writeBoolean(withOrdinals); + if (withOrdinals) { + final ObjectIntMap dictionary = collectStringLiterals(responses); + final String[] inverseTable = new String[dictionary.size()]; + for (ObjectIntCursor cursor : dictionary) { + inverseTable[cursor.value] = cursor.key; + } + output.writeStringArray(inverseTable); + output = new StreamOutputWithOrdinals(output, dictionary); + } + } + output.writeList(responses); + } + + static List readResponses(StreamInput in) throws IOException { + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + final boolean withOrdinals = in.readBoolean(); + if (withOrdinals) { + final String[] lookupTable = in.readStringArray(); + in = new StreamInputWithOrdinals(in, lookupTable); + } + } + return in.readList(FieldCapabilitiesIndexResponse::new); + } + + private static class StreamOutputWithOrdinals extends FilterStreamOutput { + private final ObjectIntMap dictionary; + StreamOutputWithOrdinals(StreamOutput out, ObjectIntMap dictionary) { + super(out); + this.dictionary = dictionary; + } + + @Override + public void writeString(String str) throws IOException { + final int index = dictionary.getOrDefault(str, -1); + if (index == -1) { + assert false : "string value [" + str + " wasn't added to the dictionary"; + throw new IllegalStateException("String value [" + str + "] was added to the dictionary"); + } + super.writeVInt(index); + } + } + + private static class StreamInputWithOrdinals extends FilterStreamInput { + private final String[] lookupTable; + + StreamInputWithOrdinals(StreamInput in, String[] lookupTable) { + super(in); + this.lookupTable = lookupTable; + } + + @Override + public String readString() throws IOException { + final int index = readVInt(); + if (index < 0 || index >= lookupTable.length) { + assert false : "index " + index + " table length = " + lookupTable.length; + throw new IndexOutOfBoundsException(index); + } + return lookupTable[index]; + } + } } diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java index 946b4d1272784..11692f750bda7 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java @@ -71,7 +71,7 @@ public FieldCapabilitiesResponse(StreamInput in) throws IOException { super(in); indices = in.readStringArray(); this.responseMap = in.readMap(StreamInput::readString, FieldCapabilitiesResponse::readField); - indexResponses = in.readList(FieldCapabilitiesIndexResponse::new); + indexResponses = FieldCapabilitiesIndexResponse.readResponses(in); this.failures = in.readList(FieldCapabilitiesFailure::new); } @@ -137,7 +137,7 @@ private static Map readField(StreamInput in) throws I public void writeTo(StreamOutput out) throws IOException { out.writeStringArray(indices); out.writeMap(responseMap, StreamOutput::writeString, FieldCapabilitiesResponse::writeField); - out.writeList(indexResponses); + FieldCapabilitiesIndexResponse.writeResponses(out, indexResponses); out.writeList(failures); } diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamOutput.java new file mode 100644 index 0000000000000..ed9a997a19280 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamOutput.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.io.stream; + +import java.io.IOException; + +public abstract class FilterStreamOutput extends StreamOutput { + private final StreamOutput out; + + public FilterStreamOutput(StreamOutput out) { + this.out = out; + } + + @Override + public void writeByte(byte b) throws IOException { + out.writeByte(b); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + out.writeBytes(b, offset, length); + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void close() throws IOException { + out.close(); + } + + @Override + public void reset() throws IOException { + out.reset(); + } +}