Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sending Large Transport Messages Should be Optimized #82245

Closed
Tracked by #77466
original-brownbear opened this issue Jan 5, 2022 · 10 comments
Closed
Tracked by #77466

Sending Large Transport Messages Should be Optimized #82245

original-brownbear opened this issue Jan 5, 2022 · 10 comments
Labels
>bug :Distributed Coordination/Network Http and internode communication implementations Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. team-discuss

Comments

@original-brownbear
Copy link
Member

This is effectively the same problem as #79560 but for the transport layer.
Sending and receiving a large message on the transport layer currently entails buffering the whole message in full before sending and reading. With recent improvements to field caps and can_match we're now working with more large transport messages and are seeing field_caps messages of O(100M) in some cases. Also, we've already been dealing with large cluster state messages for a while. The way things are implemented right now when working with uncompressed transport messages, sending a 100M message costs us more than 200M in peak heap usage on both sender and receiver of the message in cases like field caps where we're paying for both the message object itself on heap as well as the fully buffered message.

As has been discussed previously in various situations, I think we need to improve the network layer to support more efficient streaming serialisation and deserialisation to avoid buffering full messages going forward. I don't think we would necessarily have to change the over the wire format to do so since the current format already contains quite a bit of size information, though adjustments to the format could make it easier to do streaming reads.

Concretely I would suggest the following:

  • serialize messages on the transport layer again to avoid wasting buffers before it's at least likely that they can be flushed to the network like we had in Serialize Outbound Message on Flush #57084 (the problems that had us revert this change have now been resolved and we could easily bring it back)
    • but go even further and only serialize a part of the message that we expect to be able to flush before writing it to the wire, then continue serialising once the channel is writable again
  • look into deserialising message as we read them instead of buffering them in full up-front. Even with the current format this should be possible for things like field-caps albeit at the cost of re-reading parts of a message (we can optimistically try to deserialise and if we fail to read part of a message in full retry once we have more bytes buffered)

relates #77466

@original-brownbear original-brownbear added >bug :Distributed Coordination/Network Http and internode communication implementations team-discuss needs:triage Requires assignment of a team area label labels Jan 5, 2022
@elasticmachine elasticmachine added the Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. label Jan 5, 2022
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

@original-brownbear original-brownbear removed the needs:triage Requires assignment of a team area label label Jan 5, 2022
@DaveCTurner
Copy link
Contributor

I'm not against the idea but I do wonder if it's optimising the wrong thing. Should we consider it a bug to need to send such large transport messages? Does it even make sense to send O(100M) field caps responses around? Are we sending O(100M) responses to the caller? If so, does that even make sense? If not, could we push some of the reduction work to the sender?

Another culprit is messages that contain the whole cluster state (publication and join validation) and I've been considering it a scalability bug that these go over the wire in a single message.

@original-brownbear
Copy link
Member Author

I'm not against the idea but I do wonder if it's optimising the wrong thing. Should we consider it a bug to need to send such large transport messages?

It's probably a bug in many cases (for field caps it definitely is) but not for all (stats etc. are tricky to compress more though possible I think) and then there's also just large search responses. So I'd reason about it this way:
We can either start fixing these messages one by one which will take a lot of time and still leaves us with a somewhat inefficient transport layer and does not address large search responses etc.
... or we just fix the network layer now and this becomes a bug of low priority I'd say. I don't think I have seen many if any issues (outside of broken network setups) where network throughput was a real concern?

@DaveCTurner
Copy link
Contributor

... serialize a part of the message ... fail to read part of a message ...

My main remaining question is: what is a "part" and how will this work? The de/serialization APIs are not async today so to work more incrementally we'd either need to block the thread or we'd have to introduce some asyncness. Would everything become async? I think that'd be a step too far. So maybe we will have to rely on callers defining appropriate boundaries between "parts".

@DaveCTurner
Copy link
Contributor

DaveCTurner commented Feb 1, 2022

We (the @elastic/es-distributed team) discussed this again today. We still feel that unbounded-length messages are kind of a bug that we need to address on a case-by-case basis, but we also see some advantages in providing some transport-layer tools to help.

Henning raised the idea of "multi-responses" where the receiver would send a request and handle a sequence of responses rather than a single response like we do today. Potentially we'd be able to translate the chunked transport-layer response into chunks of a REST response, massively reducing the peak resource needs on the coordinating node. My feeling was that there's not many cases for which that would be possible (e.g. it's going to be hard for most fan-out requests) and also we can kind of do the transport-layer side of this already today, see e.g. internal:index/shard/recovery/file_chunk.

Armin proposed a more Netty-like API to make de/serialization code more aware of its buffer usage that will get a write-up in a separate comment. I think this would work but I worry it would be a big deal to shift from one API to another and I am also troubled by the amount of boring-but-tricky state management we'd have to add to our de/serialization code.

I'd like to explore an alternative extension to the existing APIs that lets users specify chunk boundaries in sensible places but uses callbacks to keep track of the serialization state. Something like this:

diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java
index 437f986c6ad..d4fd0286d5e 100644
--- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java
+++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java
@@ -39,6 +39,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentParserUtils;
+import org.elasticsearch.core.CheckedRunnable;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.gateway.MetadataStateFormat;
 import org.elasticsearch.index.Index;
@@ -1038,7 +1039,7 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
 
     public static final Version MAPPINGS_AS_HASH_VERSION = Version.V_8_1_0;
 
-    public static Metadata readFrom(StreamInput in) throws IOException {
+    public static void readFrom(StreamInput in, Consumer<Metadata> consumer) throws IOException {
         Builder builder = new Builder();
         builder.version = in.readLong();
         builder.clusterUUID = in.readString();
@@ -1065,21 +1066,34 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
         } else {
             mappingLookup = null;
         }
-        int size = in.readVInt();
-        for (int i = 0; i < size; i++) {
-            builder.put(IndexMetadata.readFrom(in, mappingLookup), false);
-        }
-        size = in.readVInt();
-        for (int i = 0; i < size; i++) {
-            builder.put(IndexTemplateMetadata.readFrom(in));
-        }
-        int customSize = in.readVInt();
-        for (int i = 0; i < customSize; i++) {
-            Custom customIndexMetadata = in.readNamedWriteable(Custom.class);
-            builder.putCustom(customIndexMetadata.getWriteableName(), customIndexMetadata);
+        final int indexCount = in.readVInt();
+        in.readAsync(() -> readIndexMetadata(in, indexCount, mappingLookup, imd -> builder.put(imd, false), () -> {
+            final int templateCount = in.readVInt();
+            for (int i = 0; i < templateCount; i++) {
+                builder.put(IndexTemplateMetadata.readFrom(in));
+            }
+            int customSize = in.readVInt();
+            for (int i = 0; i < customSize; i++) {
+                Custom customIndexMetadata = in.readNamedWriteable(Custom.class);
+                builder.putCustom(customIndexMetadata.getWriteableName(), customIndexMetadata);
+            }
+            consumer.accept(builder.build());
+        }));
+    }
+
+    private static void readIndexMetadata(
+        StreamInput in,
+        int remaining,
+        Function<String, MappingMetadata> mappingLookup,
+        Consumer<IndexMetadata> itemConsumer,
+        CheckedRunnable<IOException> onCompletion
+    ) throws IOException {
+        if (remaining > 0) {
+            itemConsumer.accept(IndexMetadata.readFrom(in, mappingLookup));
+            in.readAsync(() -> readIndexMetadata(in, remaining - 1, mappingLookup, itemConsumer, onCompletion));
+        } else {
+            onCompletion.run();
         }
-
-        return builder.build();
     }
 
     @Override
@@ -1100,14 +1114,27 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
         }
         out.writeVInt(indices.size());
         final boolean writeMappingsHash = out.getVersion().onOrAfter(MAPPINGS_AS_HASH_VERSION);
-        for (IndexMetadata indexMetadata : this) {
-            indexMetadata.writeTo(out, writeMappingsHash);
-        }
-        out.writeVInt(templates.size());
-        for (IndexTemplateMetadata template : templates.values()) {
-            template.writeTo(out);
+        out.writeAsync(() -> writeIndexMetadata(out, this.iterator(), writeMappingsHash, () -> {
+            out.writeVInt(templates.size());
+            for (IndexTemplateMetadata template : templates.values()) {
+                template.writeTo(out);
+            }
+            VersionedNamedWriteable.writeVersionedWritables(out, customs);
+        }));
+    }
+
+    public void writeIndexMetadata(
+        StreamOutput out,
+        Iterator<IndexMetadata> indexMetadataIterator,
+        boolean writeMappingsHash,
+        CheckedRunnable<IOException> onCompletion
+    ) throws IOException {
+        if (indexMetadataIterator.hasNext()) {
+            indexMetadataIterator.next().writeTo(out, writeMappingsHash);
+            out.writeAsync(() -> writeIndexMetadata(out, indexMetadataIterator, writeMappingsHash, onCompletion));
+        } else {
+            onCompletion.run();
         }
-        VersionedNamedWriteable.writeVersionedWritables(out, customs);
     }
 
     public static Builder builder() {

@Tim-Brooks
Copy link
Contributor

I'm still catching up on this particular conversation. Since our serialization uses tons of variable length encodings (Strings!) I'll be curious to hear what is discussed in the meeting as the solution to that particular issue in regards to serialization. I'll comment more once I know what is involved there.

I will say - I prototyped a chunked transfer encoding this fall and was pretty convinced we should move to that. We have supported fixed header sizes for long enough that it would be possible have a header frame followed by data frames. My motivation is specifically that using a content-length type of encoding with compression will be very dangerous once we move serialization back onto the network thread. The risk of course is that compressing a 10MB size of message at once on the network thread risks starvation. Incremental compression with flushes involves implicit context switches.

That change will require some doable transport format changes. What I am describing still aggregates at the transport level. But presumably it would be possible to expose that to the application level if we did want to split messages up into multiple chunked frames. There would be a decision to be made if we would allow frames to be woven in-between different messages.

I am not as familiar with the motivation for splitting messages, but some of the reasoning described in the issue makes sense to me.

@original-brownbear
Copy link
Member Author

I thought about this a little more and I'm still a big fan of doing something like this sketch of the read side:

Note: the discard method would be similar to Netty and slice and release (if possible) the given number of bytes.

interface Reader {
    // true once done and this reader can be removed from the pipeline
    boolean doRead(BytesReference bytesReference) throws IOException;
}

public class MetadataReader implements Reader {

    boolean readFirstPart;

    int indexMetadataCount = -1;

    int indexMetadataRead = 0;

    Metadata.Builder builder = Metadata.builder();

    @Override
    public boolean doRead(BytesReference bytesReference) {
        if (readFirstPart == false) {
            if (bytesReference.length() > 4) {
                final int lengthOfInitialSection = bytesReference.getInt(0);
                if (bytesReference.length() >= lengthOfInitialSection + 4) {
                    final StreamInput streamInitialSection = bytesReference.slice(4, lengthOfInitialSection).streamInput();
                    // read all the stuff before the index metadata into the builder
                    readFirstPart = true;
                    bytesReference = bytesReference.discard(4 + lengthOfInitialSection);
                } else {
                    return false;
                }
            }
        }
        if (readFirstPart) {
            if (indexMetadataCount < 0) {
                if (bytesReference.length() <= 4) {
                    return false;
                }
                indexMetadataCount = bytesReference.getInt(0);
                bytesReference = bytesReference.discard(4);
            }
            while (bytesReference.length() > 4 && indexMetadataRead < indexMetadataCount) {
                int lengthOfMetadata = bytesReference.getInt(0);
                if (lengthOfMetadata > bytesReference.length() - 4) {
                    return false;
                }
                // read index metadata into builder
                bytesReference = bytesReference.discard(lengthOfMetadata + 4);
            }
        }
        // and so on until
        handleMetadata(builder.build());
        return true;
    }

    private MetadataReader handleMetadata(Metadata metadata) {
        // add to a CS builder, serialize to REST, whatever else you want
    }
}

This would be used directly via a call to:

channel.read(Reader);

which would be triggered after reading the message header.
It would get called every time there's bytes available and we'd read whatever we can read on every call.
Users of the interface could either do what the example does and add the parts read step-by-step to a builder, or do things like stream them out as REST to another channel directly. Memory would be released as soon as it's no longer needed via discard.
For write you could do the exact same in a (I hope) somewhat straight-forward manner. A reader for generic Writable in the current protocol is fairly trivial to implement because the current protocol knows the full message size. Any existing writable reader could be wrapped via something like:

public class WritableReader<T extends Writeable> implements Reader {

    private final int size;
    private final Consumer<T> consumer;
    private final Writeable.Reader<T> reader;

    WritableReader(int size, Consumer<T> consumer, Writeable.Reader<T> reader) {
        this.size = size;
        this.consumer = consumer;
        this.reader = reader;
    }

    @Override
    public boolean doRead(BytesReference bytesReference) throws IOException {
        if (bytesReference.length() >= size) {
            final T value = reader.read(bytesReference.slice(0, size).streamInput());
            bytesReference.discard(size);
            consumer.accept(value);
            return true;
        }
        return false;
    }
}

and large messages could be ported as needed.

There's a couple reasons why I like this approach better:

  • We need something like this to build the above suggestion by David under the hood anyway I believe. Just having the state implicitly on the "stack" won't work IMO. See below point on stack-overflow.
    • And by that it also allows building a nicer API like the above on top if we want it for some higher level use cases.
  • This is definitely faster. For one, it has no tricky lambda call-sites, no hard-to-understand memory effects from capturing lambdas, stack-overflow (what happens if you have 5k index metadatas and they become available all at once?).
  • We can build efficient stream + transcode for coordinating nodes in a relatively straightforward manner.
  • The write side could null out or release objects it has written already just the same way discard works here, giving us transparent control on the amount of memory in flight.
  • Changing this API will be hard, if we add yet another rather high-level API and don't capture all use-cases now, we might regret it. If we expose a low-level API, we can always build a high-level one on top.
  • This approach to the API is widely used by Netty and unchanged in Netty-5 => it is easy to implement, does not hide the threading of the caller and proven in the real world.

@henningandersen
Copy link
Contributor

@original-brownbear
Would the approach described above not require the outer level response/request object(s) until root level to have such a reader implementation? Implying ClusterStateResponse and ClusterState need such a reader - in addition to the metadata one? Also, I wonder how the configuration of which reader to use for a response would happen?

@original-brownbear
Copy link
Member Author

Would the approach described above not require the outer level response/request object(s) until root level to have such a reader implementation?

Yes. If you want to "chunked-read" Metadata inside of a ClusterStateResponse you'll have to "chunked-read" ClusterStateResponse as well.

I wonder how the configuration of which reader to use for a response would happen?

Pretty much like today. You read the message header, it tells you the response id, you look up the reader and go from there. All that changes is that we'd interpret the response id as soon as we've read compared to today where we fully buffer the message before interpreting the response id.

@original-brownbear
Copy link
Member Author

I'll close this one out for now. We had some good discussions here, we'll do this for REST via chunked encoding and we can revisit the transport layer if need be. For now we fixed this in a number of spots by reducing message size and that might be the correct approach everywhere going forward.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>bug :Distributed Coordination/Network Http and internode communication implementations Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. team-discuss
Projects
None yet
Development

No branches or pull requests

5 participants