Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate fields in FieldCaps intra-cluster messages #100022

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ static TransportVersion def(int id) {
public static final TransportVersion WAIT_FOR_CLUSTER_STATE_IN_RECOVERY_ADDED = def(8_502_00_0);
public static final TransportVersion RECOVERY_COMMIT_TOO_NEW_EXCEPTION_ADDED = def(8_503_00_0);
public static final TransportVersion NODE_INFO_COMPONENT_VERSIONS_ADDED = def(8_504_00_0);

public static final TransportVersion COMPACT_FIELD_CAPS_ADDED = def(8_505_00_0);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -67,6 +69,8 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

private record CompressedGroup(String[] indices, String mappingHash, int[] fields) {}

static List<FieldCapabilitiesIndexResponse> readList(StreamInput input) throws IOException {
if (input.getTransportVersion().before(MAPPING_HASH_VERSION)) {
return input.readCollectionAsList(FieldCapabilitiesIndexResponse::new);
Expand All @@ -77,6 +81,37 @@ static List<FieldCapabilitiesIndexResponse> readList(StreamInput input) throws I
responses.add(new FieldCapabilitiesIndexResponse(input));
}
final int groups = input.readVInt();
if (input.getTransportVersion().onOrAfter(TransportVersions.COMPACT_FIELD_CAPS_ADDED)) {
collectCompressedResponses(input, groups, responses);
} else {
collectResponsesLegacyFormat(input, groups, responses);
}
return responses;
}

private static void collectCompressedResponses(StreamInput input, int groups, ArrayList<FieldCapabilitiesIndexResponse> responses)
throws IOException {
final CompressedGroup[] compressedGroups = new CompressedGroup[groups];
for (int i = 0; i < groups; i++) {
final String[] indices = input.readStringArray();
final String mappingHash = input.readString();
compressedGroups[i] = new CompressedGroup(indices, mappingHash, input.readIntArray());
}
final IndexFieldCapabilities[] ifcLookup = input.readArray(IndexFieldCapabilities::readFrom, IndexFieldCapabilities[]::new);
for (CompressedGroup compressedGroup : compressedGroups) {
final Map<String, IndexFieldCapabilities> ifc = Maps.newMapWithExpectedSize(compressedGroup.fields.length);
for (int i : compressedGroup.fields) {
var val = ifcLookup[i];
ifc.put(val.name(), val);
}
for (String index : compressedGroup.indices) {
responses.add(new FieldCapabilitiesIndexResponse(index, compressedGroup.mappingHash, ifc, true));
}
}
}

private static void collectResponsesLegacyFormat(StreamInput input, int groups, ArrayList<FieldCapabilitiesIndexResponse> responses)
throws IOException {
for (int i = 0; i < groups; i++) {
final List<String> indices = input.readStringCollectionAsList();
final String mappingHash = input.readString();
Expand All @@ -85,7 +120,6 @@ static List<FieldCapabilitiesIndexResponse> readList(StreamInput input) throws I
responses.add(new FieldCapabilitiesIndexResponse(index, mappingHash, ifc, true));
}
}
return responses;
}

static void writeList(StreamOutput output, List<FieldCapabilitiesIndexResponse> responses) throws IOException {
Expand All @@ -103,7 +137,19 @@ static void writeList(StreamOutput output, List<FieldCapabilitiesIndexResponse>
ungroupedResponses.add(r);
}
}

output.writeCollection(ungroupedResponses);
if (output.getTransportVersion().onOrAfter(TransportVersions.COMPACT_FIELD_CAPS_ADDED)) {
writeCompressedResponses(output, groupedResponsesMap);
} else {
writeResponsesLegacyFormat(output, groupedResponsesMap);
}
}

private static void writeResponsesLegacyFormat(
StreamOutput output,
Map<String, List<FieldCapabilitiesIndexResponse>> groupedResponsesMap
) throws IOException {
output.writeCollection(groupedResponsesMap.values(), (o, fieldCapabilitiesIndexResponses) -> {
o.writeCollection(fieldCapabilitiesIndexResponses, (oo, r) -> oo.writeString(r.indexName));
var first = fieldCapabilitiesIndexResponses.get(0);
Expand All @@ -112,6 +158,25 @@ static void writeList(StreamOutput output, List<FieldCapabilitiesIndexResponse>
});
}

private static void writeCompressedResponses(StreamOutput output, Map<String, List<FieldCapabilitiesIndexResponse>> groupedResponsesMap)
throws IOException {
final Map<IndexFieldCapabilities, Integer> fieldDedupMap = new LinkedHashMap<>();
output.writeCollection(groupedResponsesMap.values(), (o, fieldCapabilitiesIndexResponses) -> {
o.writeCollection(fieldCapabilitiesIndexResponses, (oo, r) -> oo.writeString(r.indexName));
var first = fieldCapabilitiesIndexResponses.get(0);
o.writeString(first.indexMappingHash);
o.writeVInt(first.responseMap.size());
for (IndexFieldCapabilities ifc : first.responseMap.values()) {
Integer offset = fieldDedupMap.size();
final Integer found = fieldDedupMap.putIfAbsent(ifc, offset);
o.writeInt(found == null ? offset : found);
}
});
// this is a linked hash map so the key-set is written in insertion order, so we can just write it out in order and then read it
// back as an array of FieldCapabilitiesIndexResponse in #collectCompressedResponses to use as a lookup
output.writeCollection(fieldDedupMap.keySet());
}

/**
* Get the index name
*/
Expand Down