Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open node connections asynchronously #35144

Merged
merged 32 commits into from
Nov 7, 2018
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
b26a55f
WIP
Tim-Brooks Oct 29, 2018
725d10e
WIP
Tim-Brooks Oct 29, 2018
bd536b4
WIP
Tim-Brooks Oct 30, 2018
ccf1b1a
WIP
Tim-Brooks Oct 30, 2018
ad5d503
WIP
Tim-Brooks Oct 30, 2018
1417eff
WIP
Tim-Brooks Oct 30, 2018
cc7f9fc
WIP
Tim-Brooks Oct 30, 2018
3781f4d
Move handshaking to specialized class
Tim-Brooks Oct 31, 2018
4cf0523
WIP
Tim-Brooks Oct 31, 2018
597baf1
Cleanup
Tim-Brooks Oct 31, 2018
ee2089e
Merge remote-tracking branch 'upstream/master' into async_connections
Tim-Brooks Oct 31, 2018
fbf4800
Fix test
Tim-Brooks Oct 31, 2018
b7812f8
Merge remote-tracking branch 'upstream/master' into async_connections
Tim-Brooks Oct 31, 2018
6db9478
Fix tests
Tim-Brooks Oct 31, 2018
a42ff48
Merge remote-tracking branch 'upstream/master' into async_connections
Tim-Brooks Oct 31, 2018
562bc49
Fix compile
Tim-Brooks Oct 31, 2018
46832ee
Fix checkstyle
Tim-Brooks Oct 31, 2018
7439bd1
Fix compile
Tim-Brooks Oct 31, 2018
eb5b270
Merge remote-tracking branch 'upstream/master' into async_connections
Tim-Brooks Nov 1, 2018
b3a751c
Fix test
Tim-Brooks Nov 1, 2018
6581590
Fix concurrency
Tim-Brooks Nov 1, 2018
e400c97
Changes
Tim-Brooks Nov 1, 2018
dd9d346
Merge remote-tracking branch 'upstream/master' into async_connections
Tim-Brooks Nov 1, 2018
15e6b74
Call method
Tim-Brooks Nov 1, 2018
1320f51
Merge remote-tracking branch 'upstream/master' into async_connections
Tim-Brooks Nov 2, 2018
18c6185
Merge remote-tracking branch 'upstream/master' into async_connections
Tim-Brooks Nov 3, 2018
2e71c4f
Changes for review
Tim-Brooks Nov 5, 2018
8406300
Fix checkstyle
Tim-Brooks Nov 5, 2018
c58e7f0
Fix checkstyle
Tim-Brooks Nov 5, 2018
660877a
Fix checkstyle
Tim-Brooks Nov 5, 2018
f71c30c
Change from review
Tim-Brooks Nov 6, 2018
03f496b
Merge remote-tracking branch 'upstream/master' into async_connections
Tim-Brooks Nov 6, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.transport;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.network.NetworkService;
Expand Down Expand Up @@ -81,8 +82,8 @@ public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadP
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, threadPool, networkService, bigArrays,
namedWriteableRegistry, circuitBreakerService));
return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
networkService, bigArrays, namedWriteableRegistry, circuitBreakerService));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,17 @@ public class Netty4TcpChannel implements TcpChannel {

private final Channel channel;
private final String profile;
private final CompletableContext<Void> connectContext;
private final CompletableContext<Void> closeContext = new CompletableContext<>();

Netty4TcpChannel(Channel channel, String profile) {
this(channel, profile, completedConnectContext());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we have the extra ctor here? is it for testing? i would love to remove this one it might be worth going the extra mile and do this in every test explicitly.

}

Netty4TcpChannel(Channel channel, String profile, CompletableContext<Void> connectContext) {
this.channel = channel;
this.profile = profile;
this.connectContext = connectContext;
this.channel.closeFuture().addListener(f -> {
if (f.isSuccess()) {
closeContext.complete(null);
Expand Down Expand Up @@ -72,6 +78,11 @@ public void addCloseListener(ActionListener<Void> listener) {
closeContext.addListener(ActionListener.toBiConsumer(listener));
}

@Override
public void addConnectListener(ActionListener<Void> listener) {
connectContext.addListener(ActionListener.toBiConsumer(listener));
}

@Override
public void setSoLinger(int value) throws IOException {
if (channel.isOpen()) {
Expand Down Expand Up @@ -132,4 +143,10 @@ public String toString() {
", remoteAddress=" + channel.remoteAddress() +
'}';
}

private static CompletableContext<Void> completedConnectContext() {
CompletableContext<Void> connectContext = new CompletableContext<>();
connectContext.complete(null);
return connectContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@
import io.netty.util.concurrent.Future;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.network.NetworkService;
Expand Down Expand Up @@ -101,9 +102,9 @@ public class Netty4Transport extends TcpTransport {
private volatile Bootstrap clientBootstrap;
private volatile NioEventLoopGroup eventLoopGroup;

public Netty4Transport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
super("netty", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
super("netty", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));
this.workerCount = WORKER_COUNT.get(settings);

Expand Down Expand Up @@ -216,7 +217,7 @@ protected ChannelHandler getClientChannelInitializer(DiscoveryNode node) {
static final AttributeKey<Netty4TcpServerChannel> SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-server-channel");

@Override
protected Netty4TcpChannel initiateChannel(DiscoveryNode node, ActionListener<Void> listener) throws IOException {
protected Netty4TcpChannel initiateChannel(DiscoveryNode node) throws IOException {
InetSocketAddress address = node.getAddress().address();
Bootstrap bootstrapWithHandler = clientBootstrap.clone();
bootstrapWithHandler.handler(getClientChannelInitializer(node));
Expand All @@ -230,19 +231,21 @@ protected Netty4TcpChannel initiateChannel(DiscoveryNode node, ActionListener<Vo
}
addClosedExceptionLogger(channel);

Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, "default");
CompletableContext<Void> connectContext = new CompletableContext<>();

Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, "default", connectContext);
channel.attr(CHANNEL_KEY).set(nettyChannel);

channelFuture.addListener(f -> {
if (f.isSuccess()) {
listener.onResponse(null);
connectContext.complete(null);
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
listener.onFailure(new Exception(cause));
connectContext.completeExceptionally(new Exception(cause));
} else {
listener.onFailure((Exception) cause);
connectContext.completeExceptionally((Exception) cause);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport.netty4;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -59,15 +60,15 @@ public void testScheduledPing() throws Exception {
CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();

NamedWriteableRegistry registry = new NamedWriteableRegistry(Collections.emptyList());
final Netty4Transport nettyA = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
final Netty4Transport nettyA = new Netty4Transport(settings, Version.CURRENT, threadPool,
new NetworkService(Collections.emptyList()), BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null);
serviceA.start();
serviceA.acceptIncomingRequests();

final Netty4Transport nettyB = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
final Netty4Transport nettyB = new Netty4Transport(settings, Version.CURRENT, threadPool,
new NetworkService(Collections.emptyList()), BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.transport.netty4;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -65,7 +66,7 @@ public void startThreadPool() {
threadPool = new ThreadPool(settings);
NetworkService networkService = new NetworkService(Collections.emptyList());
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
nettyTransport = new Netty4Transport(settings, threadPool, networkService, bigArrays,
nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, bigArrays,
new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
nettyTransport.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public ExceptionThrowingNetty4Transport(
BigArrays bigArrays,
NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) {
super(settings, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
super(settings, Version.CURRENT, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport.netty4;

import org.elasticsearch.Version;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
Expand Down Expand Up @@ -118,7 +119,7 @@ public void testThatDefaultProfilePortOverridesGeneralConfiguration() throws Exc

private TcpTransport startTransport(Settings settings, ThreadPool threadPool) {
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
TcpTransport transport = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
TcpTransport transport = new Netty4Transport(settings, Version.CURRENT, threadPool, new NetworkService(Collections.emptyList()),
bigArrays, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
transport.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.transport.netty4;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
Expand All @@ -40,7 +41,6 @@
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
Expand All @@ -54,23 +54,17 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
public static MockTransportService nettyFromThreadPool(Settings settings, ThreadPool threadPool, final Version version,
ClusterSettings clusterSettings, boolean doHandshake) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
Transport transport = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
Transport transport = new Netty4Transport(settings, version, threadPool, new NetworkService(Collections.emptyList()),
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {

@Override
public Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException,
InterruptedException {
public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener<Version> listener) {
if (doHandshake) {
return super.executeHandshake(node, channel, timeout);
super.executeHandshake(node, channel, timeout, listener);
} else {
return version.minimumCompatibilityVersion();
listener.onResponse(version.minimumCompatibilityVersion());
}
}

@Override
protected Version getCurrentVersion() {
return version;
}
};
MockTransportService mockTransportService =
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.discovery.ec2;

import com.amazonaws.services.ec2.model.Tag;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -74,8 +73,7 @@ public static void stopThreadPool() throws InterruptedException {
public void createTransportService() {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
final Transport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()),
Version.CURRENT) {
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList())) {
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
// we just need to ensure we don't resolve DNS here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public void addCloseListener(ActionListener<Void> listener) {
addCloseListener(ActionListener.toBiConsumer(listener));
}

@Override
public void addConnectListener(ActionListener<Void> listener) {
addConnectListener(ActionListener.toBiConsumer(listener));
}

@Override
public void close() {
getContext().closeChannel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.elasticsearch.transport.nio;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
Expand Down Expand Up @@ -66,10 +66,10 @@ public class NioTransport extends TcpTransport {
private volatile NioGroup nioGroup;
private volatile TcpChannelFactory clientChannelFactory;

protected NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) {
super("nio", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
protected NioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) {
super("nio", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
this.pageCacheRecycler = pageCacheRecycler;
}

Expand All @@ -80,10 +80,9 @@ protected NioTcpServerChannel bind(String name, InetSocketAddress address) throw
}

@Override
protected NioTcpChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException {
protected NioTcpChannel initiateChannel(DiscoveryNode node) throws IOException {
InetSocketAddress address = node.getAddress().address();
NioTcpChannel channel = nioGroup.openChannel(address, clientChannelFactory);
channel.addConnectListener(ActionListener.toBiConsumer(connectListener));
return channel;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.transport.nio;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
Expand Down Expand Up @@ -61,8 +62,8 @@ public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadP
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
return Collections.singletonMap(NIO_TRANSPORT_NAME,
() -> new NioTransport(settings, threadPool, networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry,
circuitBreakerService));
() -> new NioTransport(settings, Version.CURRENT, threadPool, networkService, bigArrays, pageCacheRecycler,
namedWriteableRegistry, circuitBreakerService));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadP
ExceptionThrowingNioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) {
super(settings, threadPool, networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService);
super(settings, Version.CURRENT, threadPool, networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry,
circuitBreakerService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.transport.nio;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
Expand Down Expand Up @@ -57,24 +58,17 @@ public static MockTransportService nioFromThreadPool(Settings settings, ThreadPo
ClusterSettings clusterSettings, boolean doHandshake) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
NetworkService networkService = new NetworkService(Collections.emptyList());
Transport transport = new NioTransport(settings, threadPool,
networkService, BigArrays.NON_RECYCLING_INSTANCE, new MockPageCacheRecycler(settings), namedWriteableRegistry,
new NoneCircuitBreakerService()) {
Transport transport = new NioTransport(settings, version, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE,
new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService()) {

@Override
public Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException,
InterruptedException {
public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener<Version> listener) {
if (doHandshake) {
return super.executeHandshake(node, channel, timeout);
super.executeHandshake(node, channel, timeout, listener);
} else {
return version.minimumCompatibilityVersion();
listener.onResponse(version.minimumCompatibilityVersion());
}
}

@Override
protected Version getCurrentVersion() {
return version;
}
};
MockTransportService mockTransportService =
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
Expand Down
Loading