From 0a5df4e492188a8cbc375b78d9ae0ed7b2c0d4ed Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 4 Jul 2020 15:26:37 -0400 Subject: [PATCH] Throw ESRejectedExecutionException when too many pending connect listeners --- .../transport/RemoteClusterConnection.java | 4 +- .../RemoteClusterConnectionTests.java | 63 +++++++++++++++++++ 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index f03b3bf27351..6dd01d83218e 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -47,6 +47,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.threadpool.ThreadPool; @@ -65,7 +66,6 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -442,7 +442,7 @@ private void connect(ActionListener connectListener, boolean forceRun) { ContextPreservingActionListener.wrapPreservingContext(connectListener, threadPool.getThreadContext()); synchronized (queue) { if (listener != null && queue.offer(listener) == false) { - listener.onFailure(new RejectedExecutionException("connect queue is full")); + listener.onFailure(new EsRejectedExecutionException("connect queue is full")); return; } if (forceRun == false && queue.isEmpty()) { diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 51520bd285e4..499f68c0c928 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -49,6 +49,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -97,6 +98,7 @@ import static org.hamcrest.Matchers.iterableWithSize; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.startsWith; @@ -1204,6 +1206,67 @@ public void onFailure(Exception e) { } } + public void testPendingConnectListeners() throws IOException, InterruptedException { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + final Settings settings = Settings.builder() + .put(RemoteClusterConnection.REMOTE_MAX_PENDING_CONNECTION_LISTENERS.getKey(), 1).build(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, "test-cluster", + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) { + ConnectionManager connectionManager = connection.getConnectionManager(); + CountDownLatch connectionOpenedLatch = new CountDownLatch(1); + CountDownLatch connectionBlockedLatch = new CountDownLatch(1); + connectionManager.addListener(new TransportConnectionListener() { + @Override + public void onConnectionOpened(Transport.Connection connection) { + connectionOpenedLatch.countDown(); + try { + connectionBlockedLatch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + }); + + Thread thread = new Thread(() -> connection.ensureConnected(ActionListener.wrap(() -> {}))); + thread.start(); + connectionOpenedLatch.await(); + connection.ensureConnected(ActionListener.wrap(() -> {})); + try { + int pendingConnections = randomIntBetween(1, 5); + for (int i = 0; i < pendingConnections; i++) { + AtomicReference error = new AtomicReference<>(); + connection.ensureConnected(new ActionListener() { + @Override + public void onResponse(Void aVoid) { + + } + + @Override + public void onFailure(Exception e) { + error.set(e); + } + }); + assertThat(error.get(), not(nullValue())); + assertThat(error.get(), instanceOf(EsRejectedExecutionException.class)); + } + } finally { + connectionBlockedLatch.countDown(); + thread.join(); + } + } + } + } + } + public void testCollectNodes() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT)) {