Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract proxy connection logic to specialized class #46898

Merged
merged 11 commits into from
Sep 25, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
Expand Down Expand Up @@ -113,7 +114,7 @@ public FollowersChecker(Settings settings, TransportService transportService,
(request, transportChannel, task) -> handleFollowerCheck(request, transportChannel));
transportService.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
handleDisconnectedNode(node);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
Expand Down Expand Up @@ -103,7 +104,7 @@ public class LeaderChecker {

transportService.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
handleDisconnectedNode(node);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@
import org.elasticsearch.core.internal.io.IOUtils;

import java.io.Closeable;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -150,13 +148,13 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
} else {
logger.debug("connected to node [{}]", node);
try {
connectionListener.onNodeConnected(node);
connectionListener.onNodeConnected(node, conn);
} finally {
final Transport.Connection finalConnection = conn;
conn.addCloseListener(ActionListener.wrap(() -> {
logger.trace("unregistering {} after connection close and marking as disconnected", node);
connectedNodes.remove(node, finalConnection);
connectionListener.onNodeDisconnected(node);
connectionListener.onNodeDisconnected(node, conn);
}));
}
}
Expand Down Expand Up @@ -218,13 +216,6 @@ public int size() {
return connectedNodes.size();
}

/**
* Returns the set of nodes this manager is connected to.
*/
public Set<DiscoveryNode> connectedNodes() {
return Collections.unmodifiableSet(connectedNodes.keySet());
}

@Override
public void close() {
internalClose(true);
Expand Down Expand Up @@ -283,16 +274,16 @@ private static final class DelegatingNodeConnectionListener implements Transport
private final CopyOnWriteArrayList<TransportConnectionListener> listeners = new CopyOnWriteArrayList<>();

@Override
public void onNodeDisconnected(DiscoveryNode key) {
public void onNodeDisconnected(DiscoveryNode key, Transport.Connection connection) {
for (TransportConnectionListener listener : listeners) {
listener.onNodeDisconnected(key);
listener.onNodeDisconnected(key, connection);
}
}

@Override
public void onNodeConnected(DiscoveryNode node) {
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
for (TransportConnectionListener listener : listeners) {
listener.onNodeConnected(node);
listener.onNodeConnected(node, connection);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
Expand Down Expand Up @@ -53,7 +52,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand All @@ -78,7 +76,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
private static final Logger logger = LogManager.getLogger(RemoteClusterConnection.class);

private final TransportService transportService;
private final ConnectionManager connectionManager;
private final RemoteConnectionManager remoteConnectionManager;
private final String clusterAlias;
private final int maxNumRemoteConnections;
private final Predicate<DiscoveryNode> nodePredicate;
Expand Down Expand Up @@ -116,7 +114,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
this.maxNumRemoteConnections = maxNumRemoteConnections;
this.nodePredicate = nodePredicate;
this.clusterAlias = clusterAlias;
this.connectionManager = connectionManager;
this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
this.seedNodes = Collections.unmodifiableList(seedNodes);
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
.getConcreteSettingForNamespace(clusterAlias).get(settings);
Expand Down Expand Up @@ -168,8 +166,8 @@ boolean isSkipUnavailable() {
}

@Override
public void onNodeDisconnected(DiscoveryNode node) {
if (connectionManager.size() < maxNumRemoteConnections) {
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
if (remoteConnectionManager.size() < maxNumRemoteConnections) {
// try to reconnect and fill up the slot of the disconnected node
connectHandler.connect(ActionListener.wrap(
ignore -> logger.trace("successfully connected after disconnect of {}", node),
Expand All @@ -182,7 +180,7 @@ public void onNodeDisconnected(DiscoveryNode node) {
* will invoke the listener immediately.
*/
void ensureConnected(ActionListener<Void> voidActionListener) {
if (connectionManager.size() == 0) {
if (remoteConnectionManager.size() == 0) {
connectHandler.connect(voidActionListener);
} else {
voidActionListener.onResponse(null);
Expand Down Expand Up @@ -211,8 +209,7 @@ void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
request.clear();
request.nodes(true);
request.local(true); // run this on the node that gets the request it's as good as any other
final DiscoveryNode node = getAnyConnectedNode();
Transport.Connection connection = connectionManager.getConnection(node);
Transport.Connection connection = remoteConnectionManager.getAnyRemoteConnection();
transportService.sendRequest(connection, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
new TransportResponseHandler<ClusterStateResponse>() {

Expand Down Expand Up @@ -256,12 +253,7 @@ public String executor() {
* If such node is not connected, the returned connection will be a proxy connection that redirects to it.
*/
Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
if (connectionManager.nodeConnected(remoteClusterNode)) {
return connectionManager.getConnection(remoteClusterNode);
}
DiscoveryNode discoveryNode = getAnyConnectedNode();
Transport.Connection connection = connectionManager.getConnection(discoveryNode);
return new ProxyConnection(connection, remoteClusterNode);
return remoteConnectionManager.getRemoteConnection(remoteClusterNode);
}

private Predicate<ClusterName> getRemoteClusterNamePredicate() {
Expand All @@ -280,67 +272,19 @@ public String toString() {
};
}


static final class ProxyConnection implements Transport.Connection {
private final Transport.Connection proxyConnection;
private final DiscoveryNode targetNode;

private ProxyConnection(Transport.Connection proxyConnection, DiscoveryNode targetNode) {
this.proxyConnection = proxyConnection;
this.targetNode = targetNode;
}

@Override
public DiscoveryNode getNode() {
return targetNode;
}

@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
proxyConnection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
TransportActionProxy.wrapRequest(targetNode, request), options);
}

@Override
public void close() {
assert false: "proxy connections must not be closed";
}

@Override
public void addCloseListener(ActionListener<Void> listener) {
proxyConnection.addCloseListener(listener);
}

@Override
public boolean isClosed() {
return proxyConnection.isClosed();
}

@Override
public Version getVersion() {
return proxyConnection.getVersion();
}
}

Transport.Connection getConnection() {
return connectionManager.getConnection(getAnyConnectedNode());
return remoteConnectionManager.getAnyRemoteConnection();
}

@Override
public void close() throws IOException {
IOUtils.close(connectHandler);
connectionManager.closeNoBlock();
IOUtils.close(connectHandler, remoteConnectionManager);
}

public boolean isClosed() {
return connectHandler.isClosed();
}

public String getProxyAddress() {
return proxyAddress;
}

public List<Tuple<String, Supplier<DiscoveryNode>>> getSeedNodes() {
return seedNodes;
}
Expand Down Expand Up @@ -456,14 +400,14 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes, Act
final ConnectionProfile profile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG);
final StepListener<Transport.Connection> openConnectionStep = new StepListener<>();
try {
connectionManager.openConnection(seedNode, profile, openConnectionStep);
remoteConnectionManager.openConnection(seedNode, profile, openConnectionStep);
} catch (Exception e) {
onFailure.accept(e);
}

final StepListener<TransportService.HandshakeResponse> handShakeStep = new StepListener<>();
openConnectionStep.whenComplete(connection -> {
ConnectionProfile connectionProfile = connectionManager.getConnectionProfile();
ConnectionProfile connectionProfile = remoteConnectionManager.getConnectionManager().getConnectionProfile();
transportService.handshake(connection, connectionProfile.getHandshakeTimeout().millis(),
getRemoteClusterNamePredicate(), handShakeStep);
}, onFailure);
Expand All @@ -472,8 +416,8 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes, Act
handShakeStep.whenComplete(handshakeResponse -> {
final DiscoveryNode handshakeNode = maybeAddProxyAddress(proxyAddress, handshakeResponse.getDiscoveryNode());

if (nodePredicate.test(handshakeNode) && connectionManager.size() < maxNumRemoteConnections) {
connectionManager.connectToNode(handshakeNode, null,
if (nodePredicate.test(handshakeNode) && remoteConnectionManager.size() < maxNumRemoteConnections) {
remoteConnectionManager.connectToNode(handshakeNode, null,
transportService.connectionValidator(handshakeNode), fullConnectionStep);
} else {
fullConnectionStep.onResponse(null);
Expand Down Expand Up @@ -565,8 +509,8 @@ public void handleResponse(ClusterStateResponse response) {
private void handleNodes(Iterator<DiscoveryNode> nodesIter) {
while (nodesIter.hasNext()) {
final DiscoveryNode node = maybeAddProxyAddress(proxyAddress, nodesIter.next());
if (nodePredicate.test(node) && connectionManager.size() < maxNumRemoteConnections) {
connectionManager.connectToNode(node, null,
if (nodePredicate.test(node) && remoteConnectionManager.size() < maxNumRemoteConnections) {
remoteConnectionManager.connectToNode(node, null,
transportService.connectionValidator(node), new ActionListener<>() {
@Override
public void onResponse(Void aVoid) {
Expand Down Expand Up @@ -625,20 +569,7 @@ boolean assertNoRunningConnections() { // for testing only
}

boolean isNodeConnected(final DiscoveryNode node) {
return connectionManager.nodeConnected(node);
}

private final AtomicLong nextNodeId = new AtomicLong();

DiscoveryNode getAnyConnectedNode() {
List<DiscoveryNode> nodes = new ArrayList<>(connectionManager.connectedNodes());
if (nodes.isEmpty()) {
throw new NoSuchRemoteClusterException(clusterAlias);
} else {
long curr;
while ((curr = nextNodeId.incrementAndGet()) == Long.MIN_VALUE);
return nodes.get(Math.floorMod(curr, nodes.size()));
}
return remoteConnectionManager.getConnectionManager().nodeConnected(node);
}

/**
Expand All @@ -655,14 +586,14 @@ public RemoteConnectionInfo getConnectionInfo() {
}

int getNumNodesConnected() {
return connectionManager.size();
return remoteConnectionManager.size();
}

private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) {
return new ConnectionManager(connectionProfile, transportService.transport);
}

ConnectionManager getConnectionManager() {
return connectionManager;
return remoteConnectionManager.getConnectionManager();
}
}
Loading