Skip to content

Commit

Permalink
Deduplicate fields in FieldCaps intra-cluster messages (elastic#100022)
Browse files Browse the repository at this point in the history
We have a stable equality check for individual fields capabilities.
Even in the dynamic mapping case where mappings vary across indices, we
tend to have a lot of overlap across mappings. By dedplicating the
individual field capabilities instances when serializing we can reduce
the size of the field caps messages massively in many cases.

A repeated field takes up only an extra ~4 bytes in the transport
message which compared to tens of MB in the general case.
Even if deduplication were to never apply this change should reduce
response sizes though as it only adds about ~4 bytes in overhead for a
never deduplicated field but saves the redundant double writing of field
names we used to do (we wrote them both in the map key and in the value)
and it seems safe to assume that almost all field names are longer than
4 bytes.
  • Loading branch information
original-brownbear authored and piergm committed Oct 2, 2023
1 parent 50808f1 commit dca85e6
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 1 deletion.
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ static TransportVersion def(int id) {
public static final TransportVersion WAIT_FOR_CLUSTER_STATE_IN_RECOVERY_ADDED = def(8_502_00_0);
public static final TransportVersion RECOVERY_COMMIT_TOO_NEW_EXCEPTION_ADDED = def(8_503_00_0);
public static final TransportVersion NODE_INFO_COMPONENT_VERSIONS_ADDED = def(8_504_00_0);

public static final TransportVersion COMPACT_FIELD_CAPS_ADDED = def(8_505_00_0);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -67,6 +69,8 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

private record CompressedGroup(String[] indices, String mappingHash, int[] fields) {}

static List<FieldCapabilitiesIndexResponse> readList(StreamInput input) throws IOException {
if (input.getTransportVersion().before(MAPPING_HASH_VERSION)) {
return input.readCollectionAsList(FieldCapabilitiesIndexResponse::new);
Expand All @@ -77,6 +81,37 @@ static List<FieldCapabilitiesIndexResponse> readList(StreamInput input) throws I
responses.add(new FieldCapabilitiesIndexResponse(input));
}
final int groups = input.readVInt();
if (input.getTransportVersion().onOrAfter(TransportVersions.COMPACT_FIELD_CAPS_ADDED)) {
collectCompressedResponses(input, groups, responses);
} else {
collectResponsesLegacyFormat(input, groups, responses);
}
return responses;
}

private static void collectCompressedResponses(StreamInput input, int groups, ArrayList<FieldCapabilitiesIndexResponse> responses)
throws IOException {
final CompressedGroup[] compressedGroups = new CompressedGroup[groups];
for (int i = 0; i < groups; i++) {
final String[] indices = input.readStringArray();
final String mappingHash = input.readString();
compressedGroups[i] = new CompressedGroup(indices, mappingHash, input.readIntArray());
}
final IndexFieldCapabilities[] ifcLookup = input.readArray(IndexFieldCapabilities::readFrom, IndexFieldCapabilities[]::new);
for (CompressedGroup compressedGroup : compressedGroups) {
final Map<String, IndexFieldCapabilities> ifc = Maps.newMapWithExpectedSize(compressedGroup.fields.length);
for (int i : compressedGroup.fields) {
var val = ifcLookup[i];
ifc.put(val.name(), val);
}
for (String index : compressedGroup.indices) {
responses.add(new FieldCapabilitiesIndexResponse(index, compressedGroup.mappingHash, ifc, true));
}
}
}

private static void collectResponsesLegacyFormat(StreamInput input, int groups, ArrayList<FieldCapabilitiesIndexResponse> responses)
throws IOException {
for (int i = 0; i < groups; i++) {
final List<String> indices = input.readStringCollectionAsList();
final String mappingHash = input.readString();
Expand All @@ -85,7 +120,6 @@ static List<FieldCapabilitiesIndexResponse> readList(StreamInput input) throws I
responses.add(new FieldCapabilitiesIndexResponse(index, mappingHash, ifc, true));
}
}
return responses;
}

static void writeList(StreamOutput output, List<FieldCapabilitiesIndexResponse> responses) throws IOException {
Expand All @@ -103,7 +137,19 @@ static void writeList(StreamOutput output, List<FieldCapabilitiesIndexResponse>
ungroupedResponses.add(r);
}
}

output.writeCollection(ungroupedResponses);
if (output.getTransportVersion().onOrAfter(TransportVersions.COMPACT_FIELD_CAPS_ADDED)) {
writeCompressedResponses(output, groupedResponsesMap);
} else {
writeResponsesLegacyFormat(output, groupedResponsesMap);
}
}

private static void writeResponsesLegacyFormat(
StreamOutput output,
Map<String, List<FieldCapabilitiesIndexResponse>> groupedResponsesMap
) throws IOException {
output.writeCollection(groupedResponsesMap.values(), (o, fieldCapabilitiesIndexResponses) -> {
o.writeCollection(fieldCapabilitiesIndexResponses, (oo, r) -> oo.writeString(r.indexName));
var first = fieldCapabilitiesIndexResponses.get(0);
Expand All @@ -112,6 +158,25 @@ static void writeList(StreamOutput output, List<FieldCapabilitiesIndexResponse>
});
}

private static void writeCompressedResponses(StreamOutput output, Map<String, List<FieldCapabilitiesIndexResponse>> groupedResponsesMap)
throws IOException {
final Map<IndexFieldCapabilities, Integer> fieldDedupMap = new LinkedHashMap<>();
output.writeCollection(groupedResponsesMap.values(), (o, fieldCapabilitiesIndexResponses) -> {
o.writeCollection(fieldCapabilitiesIndexResponses, (oo, r) -> oo.writeString(r.indexName));
var first = fieldCapabilitiesIndexResponses.get(0);
o.writeString(first.indexMappingHash);
o.writeVInt(first.responseMap.size());
for (IndexFieldCapabilities ifc : first.responseMap.values()) {
Integer offset = fieldDedupMap.size();
final Integer found = fieldDedupMap.putIfAbsent(ifc, offset);
o.writeInt(found == null ? offset : found);
}
});
// this is a linked hash map so the key-set is written in insertion order, so we can just write it out in order and then read it
// back as an array of FieldCapabilitiesIndexResponse in #collectCompressedResponses to use as a lookup
output.writeCollection(fieldDedupMap.keySet());
}

/**
* Get the index name
*/
Expand Down

0 comments on commit dca85e6

Please sign in to comment.