diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 926cc4801dddc..5f120134acb04 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -145,6 +145,8 @@ static TransportVersion def(int id) { public static final TransportVersion WAIT_FOR_CLUSTER_STATE_IN_RECOVERY_ADDED = def(8_502_00_0); public static final TransportVersion RECOVERY_COMMIT_TOO_NEW_EXCEPTION_ADDED = def(8_503_00_0); public static final TransportVersion NODE_INFO_COMPONENT_VERSIONS_ADDED = def(8_504_00_0); + + public static final TransportVersion COMPACT_FIELD_CAPS_ADDED = def(8_505_00_0); /* * STOP! READ THIS FIRST! No, really, * ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _ diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexResponse.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexResponse.java index b1f844ed66215..e9e3a05169afc 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexResponse.java @@ -13,11 +13,13 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -67,6 +69,8 @@ public void writeTo(StreamOutput out) throws IOException { } } + private record CompressedGroup(String[] indices, String mappingHash, int[] fields) {} + static List readList(StreamInput input) throws IOException { if (input.getTransportVersion().before(MAPPING_HASH_VERSION)) { return input.readCollectionAsList(FieldCapabilitiesIndexResponse::new); @@ -77,6 +81,37 @@ static List readList(StreamInput input) throws I responses.add(new FieldCapabilitiesIndexResponse(input)); } final int groups = input.readVInt(); + if (input.getTransportVersion().onOrAfter(TransportVersions.COMPACT_FIELD_CAPS_ADDED)) { + collectCompressedResponses(input, groups, responses); + } else { + collectResponsesLegacyFormat(input, groups, responses); + } + return responses; + } + + private static void collectCompressedResponses(StreamInput input, int groups, ArrayList responses) + throws IOException { + final CompressedGroup[] compressedGroups = new CompressedGroup[groups]; + for (int i = 0; i < groups; i++) { + final String[] indices = input.readStringArray(); + final String mappingHash = input.readString(); + compressedGroups[i] = new CompressedGroup(indices, mappingHash, input.readIntArray()); + } + final IndexFieldCapabilities[] ifcLookup = input.readArray(IndexFieldCapabilities::readFrom, IndexFieldCapabilities[]::new); + for (CompressedGroup compressedGroup : compressedGroups) { + final Map ifc = Maps.newMapWithExpectedSize(compressedGroup.fields.length); + for (int i : compressedGroup.fields) { + var val = ifcLookup[i]; + ifc.put(val.name(), val); + } + for (String index : compressedGroup.indices) { + responses.add(new FieldCapabilitiesIndexResponse(index, compressedGroup.mappingHash, ifc, true)); + } + } + } + + private static void collectResponsesLegacyFormat(StreamInput input, int groups, ArrayList responses) + throws IOException { for (int i = 0; i < groups; i++) { final List indices = input.readStringCollectionAsList(); final String mappingHash = input.readString(); @@ -85,7 +120,6 @@ static List readList(StreamInput input) throws I responses.add(new FieldCapabilitiesIndexResponse(index, mappingHash, ifc, true)); } } - return responses; } static void writeList(StreamOutput output, List responses) throws IOException { @@ -103,7 +137,19 @@ static void writeList(StreamOutput output, List ungroupedResponses.add(r); } } + output.writeCollection(ungroupedResponses); + if (output.getTransportVersion().onOrAfter(TransportVersions.COMPACT_FIELD_CAPS_ADDED)) { + writeCompressedResponses(output, groupedResponsesMap); + } else { + writeResponsesLegacyFormat(output, groupedResponsesMap); + } + } + + private static void writeResponsesLegacyFormat( + StreamOutput output, + Map> groupedResponsesMap + ) throws IOException { output.writeCollection(groupedResponsesMap.values(), (o, fieldCapabilitiesIndexResponses) -> { o.writeCollection(fieldCapabilitiesIndexResponses, (oo, r) -> oo.writeString(r.indexName)); var first = fieldCapabilitiesIndexResponses.get(0); @@ -112,6 +158,25 @@ static void writeList(StreamOutput output, List }); } + private static void writeCompressedResponses(StreamOutput output, Map> groupedResponsesMap) + throws IOException { + final Map fieldDedupMap = new LinkedHashMap<>(); + output.writeCollection(groupedResponsesMap.values(), (o, fieldCapabilitiesIndexResponses) -> { + o.writeCollection(fieldCapabilitiesIndexResponses, (oo, r) -> oo.writeString(r.indexName)); + var first = fieldCapabilitiesIndexResponses.get(0); + o.writeString(first.indexMappingHash); + o.writeVInt(first.responseMap.size()); + for (IndexFieldCapabilities ifc : first.responseMap.values()) { + Integer offset = fieldDedupMap.size(); + final Integer found = fieldDedupMap.putIfAbsent(ifc, offset); + o.writeInt(found == null ? offset : found); + } + }); + // this is a linked hash map so the key-set is written in insertion order, so we can just write it out in order and then read it + // back as an array of FieldCapabilitiesIndexResponse in #collectCompressedResponses to use as a lookup + output.writeCollection(fieldDedupMap.keySet()); + } + /** * Get the index name */