-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deserialize responses on the handling thread-pool #91367
Conversation
This is the start of moving message deserialization off of the transport threads where possible. This PR introduces the basic facilities to ref count and fork serialization of transport message instances which already provides some tangible benefits to transport thread latencies. We can't not fork for large messages (which are mostly responses) in scenarios where responses can grow beyond O(1M) as this introduces unmanageable latency on the transport pool when e.g. deserializing a O(100M) cluster state or a similarly sized search response.
Pinging @elastic/es-distributed (Team:Distributed) |
Hi @original-brownbear, I've created a changelog YAML for you. |
InetSocketAddress remoteAddress, | ||
final StreamInput stream, | ||
final Header header, | ||
Releasable releaseResponse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we name this like releaseResponseBuffer
or releaseResponseBytes
or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure done
) { | ||
final T response; | ||
try { | ||
response = handler.read(stream); | ||
try (releaseResponse) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't this go in the try
block above? Is the second try necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right sorry about that, it's obviously unnecessary.
threadPool.executor(executor).execute(new ForkingResponseHandlerRunnable(handler, null) { | ||
@Override | ||
protected void doRun() { | ||
doHandleResponse(handler, remoteAddress, stream, inboundMessage.getHeader(), releaseBuffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I don't totally understand why we need to pass the releaseBuffer
mechanism into the method here.
onAfter
already handles the release. I'm not totally clear why it matters if the doHandleResponse
method clearly releases the thing. It's already being released in onAfter
no matter what.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The motivation here was to release the buffer asap and not needlessly hold on to it until the handler is is done with the deserialized message. The onAfter
was just put in place as a final fail-safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact we should always release it in doHandleResponse
, a response handler should never be rejected (see assertions in ForkingResponseHandlerRunnable
) and there's no chance an exception could prevent it either.
That said, I'm 👍 on paranoid leak prevention.
import org.elasticsearch.core.IOUtils; | ||
import org.elasticsearch.core.Releasable; | ||
|
||
import java.io.IOException; | ||
import java.util.Objects; | ||
|
||
public class InboundMessage implements Releasable { | ||
public class InboundMessage extends AbstractRefCounted { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we assert that ref count is greater than 0 when openOrGetStreamInput
is called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++ done
Thanks Tim, all points addressed now I think! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some suggestions/comments
threadPool.executor(executor).execute(new ForkingResponseHandlerRunnable(handler, null) { | ||
@Override | ||
protected void doRun() { | ||
doHandleResponse(handler, remoteAddress, stream, inboundMessage.getHeader(), releaseBuffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact we should always release it in doHandleResponse
, a response handler should never be rejected (see assertions in ForkingResponseHandlerRunnable
) and there's no chance an exception could prevent it either.
That said, I'm 👍 on paranoid leak prevention.
server/src/main/java/org/elasticsearch/transport/InboundHandler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks Tim and David! |
Just a follow-up comment/question here about memory usage and backpressure. Previously, a single thread would read bytes from the network and turn them into objects before passing them off to the executor's queue. Now we effectively put the raw bytes on the executor's queue which I guess can happen substantially faster. I think this is ok, we don't really track the size of the deserialised objects anywhere and they're typically going to be larger than the raw bytes. But we can enqueue more items in the queue now, and maybe the fuller queue of bytes takes more memory than the previous bottlenecked queue of objects. What backpressure mechanisms kick in to avoid this becoming a problem? These queues are (mostly) bounded so eventually we hit a limit there. And I think this doesn't apply to indexing which IIUC manages the forking itself so looks like it runs on |
Maybe, but on the other hand you have effects that go in the other direction. Namely, outbound messages go out quicker and they always take buffer + object memory for example (+ have higher overhead from our 16k pagesize than inbound buffers that are a little more optimally sized).
As in the above, real memory + transport circuit breaker should indirectly deal with this in practice for most cases I believe. See above though, I don't think this is a real concern practically.
I don't think so, this is responses. -> instinctively I'm not worried here and this definitely fixes observed bugs ... but I agree that it would be nice to go further and do better on the auto-read situation. |
This is the start of moving message deserialization off of the transport threads where possible. This PR introduces the basic facilities to ref count and fork serialization of transport message instances which already provides some tangible benefits to transport thread latencies.
We can't not fork for large messages (which are mostly responses) in scenarios where responses can grow beyond O(1M) as this introduces unmanageable latency on the transport pool when e.g. deserializing a O(100M) cluster state or a similarly sized search response.
Some experimenting with aggressively forking things like index stats response handling shows visible master response time improvements and resulting speedups in e.g. snapshotting a large number of shards which benefits from more responsive master nodes.
relates #77466 (improves some API latencies but also follow-ups needed here for e.g. transport broadcast actions)
relates #90622 (though doesn't fix it, that would need additional work to fork the response handling)