Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure we don't use a remote profile if cluster name matches #31331

Merged
merged 6 commits into from
Jun 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
private volatile boolean skipUnavailable;
private final ConnectHandler connectHandler;
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
private final ClusterName localClusterName;

/**
* Creates a new {@link RemoteClusterConnection}
Expand All @@ -100,6 +101,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
RemoteClusterConnection(Settings settings, String clusterAlias, List<DiscoveryNode> seedNodes,
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
super(settings);
this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.transportService = transportService;
this.maxNumRemoteConnections = maxNumRemoteConnections;
this.nodePredicate = nodePredicate;
Expand Down Expand Up @@ -310,6 +312,21 @@ public boolean isClosed() {
return connectHandler.isClosed();
}

private ConnectionProfile getRemoteProfile(ClusterName name) {
// we can only compare the cluster name to make a decision if we should use a remote profile
// we can't use a cluster UUID here since we could be connecting to that remote cluster before
// the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a
// rather smallish optimization on the connection layer under certain situations where remote clusters
// have the same name as the local one is minor here.
// the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster,
// gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity
if (this.localClusterName.equals(name)) {
return null;
} else {
return remoteProfile;
}
}

/**
* The connect handler manages node discovery and the actual connect to the remote cluster.
* There is at most one connect job running at any time. If such a connect job is triggered
Expand Down Expand Up @@ -419,7 +436,6 @@ protected void doRun() {
collectRemoteNodes(seedNodes.iterator(), transportService, listener);
}
});

}

void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
Expand All @@ -431,21 +447,27 @@ void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
if (seedNodes.hasNext()) {
cancellableThreads.executeIO(() -> {
final DiscoveryNode seedNode = seedNodes.next();
final DiscoveryNode handshakeNode;
final TransportService.HandshakeResponse handshakeResponse;
Transport.Connection connection = transportService.openConnection(seedNode,
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
boolean success = false;
try {
try {
handshakeNode = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
handshakeResponse = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
(c) -> remoteClusterName.get() == null ? true : c.equals(remoteClusterName.get()));
} catch (IllegalStateException ex) {
logger.warn(() -> new ParameterizedMessage("seed node {} cluster name mismatch expected " +
"cluster name {}", connection.getNode(), remoteClusterName.get()), ex);
throw ex;
}

final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode();
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
transportService.connectToNode(handshakeNode, remoteProfile);
transportService.connectToNode(handshakeNode, getRemoteProfile(handshakeResponse.getClusterName()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess my main question is, do we want to connect? I guess I don't see any functional difference. It just seems kind of weird that a "normal" cluster node connection is initiated from the RemoteClusterConnection and not the normal cluster formation work. And would be a catch in the future if we ever made a change to how normal cluster connections work.

Is there maybe a different race condition concern if we do not initiate the connection here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well I think if that cluster has coincidently the same name we would never instantiate this connection? If we don't connect, when would we connect to it? I can think of way more complicated things like waiting for the remote cluster to be formed etc. and then compare cluster UUIDs but that would require the local cluster to be formed and the remote one which is a complicating many many things. I think we can investigate this going forward ie. to only start connections once we have formed a cluster and never establish a connection to a node that isn't part of a cluster but this would be too complex to backport and this is a bug I want to be fixed in 5.6.x. I will create a followup issue for this and look into it what it takes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right. That makes sense.

if (remoteClusterName.get() == null) {
assert handshakeResponse.getClusterName().value() != null;
remoteClusterName.set(handshakeResponse.getClusterName());
}
connectedNodes.add(handshakeNode);
}
ClusterStateRequest request = new ClusterStateRequest();
Expand Down Expand Up @@ -552,7 +574,8 @@ public void handleResponse(ClusterStateResponse response) {
for (DiscoveryNode node : nodesIter) {
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
try {
transportService.connectToNode(node, remoteProfile); // noop if node is connected
transportService.connectToNode(node, getRemoteProfile(remoteClusterName.get())); // noop if node is
// connected
connectedNodes.add(node);
} catch (ConnectTransportException | IllegalStateException ex) {
// ISE if we fail the handshake with an version incompatible node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
}
transport.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> {
// We don't validate cluster names to allow for CCS connections.
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true);
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true).discoveryNode;
if (validateConnections && node.equals(remote) == false) {
throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote);
}
Expand Down Expand Up @@ -378,7 +378,7 @@ public Transport.Connection openConnection(final DiscoveryNode node, ConnectionP
public DiscoveryNode handshake(
final Transport.Connection connection,
final long handshakeTimeout) throws ConnectTransportException {
return handshake(connection, handshakeTimeout, clusterName::equals);
return handshake(connection, handshakeTimeout, clusterName::equals).discoveryNode;
}

/**
Expand All @@ -390,11 +390,11 @@ public DiscoveryNode handshake(
* @param connection the connection to a specific node
* @param handshakeTimeout handshake timeout
* @param clusterNamePredicate cluster name validation predicate
* @return the connected node
* @return the handshake response
* @throws ConnectTransportException if the connection failed
* @throws IllegalStateException if the handshake failed
*/
public DiscoveryNode handshake(
public HandshakeResponse handshake(
final Transport.Connection connection,
final long handshakeTimeout, Predicate<ClusterName> clusterNamePredicate) throws ConnectTransportException {
final HandshakeResponse response;
Expand All @@ -420,7 +420,7 @@ public HandshakeResponse newInstance() {
throw new IllegalStateException("handshake failed, incompatible version [" + response.version + "] - " + node);
}

return response.discoveryNode;
return response;
}

static class HandshakeRequest extends TransportRequest {
Expand Down Expand Up @@ -461,6 +461,14 @@ public void writeTo(StreamOutput out) throws IOException {
clusterName.writeTo(out);
Version.writeVersion(version, out);
}

public DiscoveryNode getDiscoveryNode() {
return discoveryNode;
}

public ClusterName getClusterName() {
return clusterName;
}
}

public void disconnectFromNode(DiscoveryNode node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,102 @@ public static MockTransportService startTransport(
}
}

public void testLocalProfileIsUsedForLocalCluster() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());

try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(seedNode));
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
assertTrue(connection.assertNoRunningConnections());
PlainTransportFuture<ClusterSearchShardsResponse> futureHandler = new PlainTransportFuture<>(
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
@Override
public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse();
inst.readFrom(in);
return inst;
}
});
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK)
.build();
service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(),
options, futureHandler);
futureHandler.txGet();
}
}
}
}

public void testRemoteProfileIsUsedForRemoteCluster() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool,
Settings.builder().put("cluster.name", "foobar").build());
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT,
threadPool, Settings.builder().put("cluster.name", "foobar").build())) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());

try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(seedNode));
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
assertTrue(connection.assertNoRunningConnections());
PlainTransportFuture<ClusterSearchShardsResponse> futureHandler = new PlainTransportFuture<>(
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
@Override
public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse();
inst.readFrom(in);
return inst;
}
});
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK)
.build();
IllegalStateException ise = (IllegalStateException) expectThrows(SendRequestTransportException.class, () -> {
service.sendRequest(discoverableNode,
ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(), options, futureHandler);
futureHandler.txGet();
}).getCause();
assertEquals(ise.getMessage(), "can't select channel size is 0 for types: [RECOVERY, BULK, STATE]");

PlainTransportFuture<ClusterSearchShardsResponse> handler = new PlainTransportFuture<>(
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
@Override
public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse();
inst.readFrom(in);
return inst;
}
});
TransportRequestOptions ops = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG)
.build();
service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(),
ops, handler);
handler.txGet();
}
}
}
}

public void testDiscoverSingleNode() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,22 @@ protected MockChannel initiateChannel(InetSocketAddress address, ActionListener<
@Override
protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) {
ConnectionProfile connectionProfile1 = resolveConnectionProfile(connectionProfile, defaultConnectionProfile);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(LIGHT_PROFILE);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
Set<TransportRequestOptions.Type> allTypesWithConnection = new HashSet<>();
Set<TransportRequestOptions.Type> allTypesWithoutConnection = new HashSet<>();
for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile1.getHandles()) {
Set<TransportRequestOptions.Type> types = handle.getTypes();
if (handle.length > 0) {
allTypesWithConnection.addAll(types);
} else {
allTypesWithoutConnection.addAll(types);
}
}
// make sure we maintain at least the types that are supported by this profile even if we only use a single channel for them.
builder.addConnections(1, allTypesWithConnection.toArray(new TransportRequestOptions.Type[0]));
if (allTypesWithoutConnection.isEmpty() == false) {
builder.addConnections(0, allTypesWithoutConnection.toArray(new TransportRequestOptions.Type[0]));
}
builder.setHandshakeTimeout(connectionProfile1.getHandshakeTimeout());
builder.setConnectTimeout(connectionProfile1.getConnectTimeout());
return builder.build();
Expand Down