From e31809f400ba61a5548a47e8717055139c7aec11 Mon Sep 17 00:00:00 2001
From: David Turner <david.turner@elastic.co>
Date: Mon, 13 Sep 2021 17:35:14 +0100
Subject: [PATCH] Avoid early release of local forking requests (#77641)

Today we protect against releasing a request from a remote node until
its handler has completed, but we do not have the same protection for
requests from the local node. This commit adds the missing refcounting.

Relates #77407
Closes #77634
---
 .../transport/TransportService.java           | 56 ++++++++++++-------
 1 file changed, 35 insertions(+), 21 deletions(-)

diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java
index 83a8bfa8cbf17..0fb1e49c1408d 100644
--- a/server/src/main/java/org/elasticsearch/transport/TransportService.java
+++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java
@@ -875,33 +875,47 @@ private void sendLocalRequest(long requestId, final String action, final Transpo
             if (ThreadPool.Names.SAME.equals(executor)) {
                 reg.processMessageReceived(request, channel);
             } else {
-                threadPool.executor(executor).execute(new AbstractRunnable() {
-                    @Override
-                    protected void doRun() throws Exception {
-                        reg.processMessageReceived(request, channel);
-                    }
+                boolean success = false;
+                request.incRef();
+                try {
+                    threadPool.executor(executor).execute(new AbstractRunnable() {
+                        @Override
+                        protected void doRun() throws Exception {
+                            reg.processMessageReceived(request, channel);
+                        }
 
-                    @Override
-                    public boolean isForceExecution() {
-                        return reg.isForceExecution();
-                    }
+                        @Override
+                        public boolean isForceExecution() {
+                            return reg.isForceExecution();
+                        }
 
-                    @Override
-                    public void onFailure(Exception e) {
-                        try {
-                            channel.sendResponse(e);
-                        } catch (Exception inner) {
-                            inner.addSuppressed(e);
-                            logger.warn(() -> new ParameterizedMessage(
+                        @Override
+                        public void onFailure(Exception e) {
+                            try {
+                                channel.sendResponse(e);
+                            } catch (Exception inner) {
+                                inner.addSuppressed(e);
+                                logger.warn(() -> new ParameterizedMessage(
                                     "failed to notify channel of error message for action [{}]", action), inner);
+                            }
                         }
-                    }
 
-                    @Override
-                    public String toString() {
-                        return "processing of [" + requestId + "][" + action + "]: " + request;
+                        @Override
+                        public String toString() {
+                            return "processing of [" + requestId + "][" + action + "]: " + request;
+                        }
+
+                        @Override
+                        public void onAfter() {
+                            request.decRef();
+                        }
+                    });
+                    success = true;
+                } finally {
+                    if (success == false) {
+                        request.decRef();
                     }
-                });
+                }
             }
 
         } catch (Exception e) {