Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only connect to formed remote clusters #31424

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand All @@ -50,7 +49,6 @@ public TransportMainAction(Settings settings, ThreadPool threadPool, TransportSe
protected void doExecute(MainRequest request, ActionListener<MainResponse> listener) {
ClusterState clusterState = clusterService.state();
assert Node.NODE_NAME_SETTING.exists(settings);
final boolean available = clusterState.getBlocks().hasGlobalBlock(RestStatus.SERVICE_UNAVAILABLE) == false;
listener.onResponse(
new MainResponse(Node.NODE_NAME_SETTING.get(settings), Version.CURRENT, clusterState.getClusterName(),
clusterState.metaData().clusterUUID(), Build.CURRENT));
Expand Down
79 changes: 66 additions & 13 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.search.SearchExecutionStatsCollector;
Expand Down Expand Up @@ -139,6 +140,7 @@
import org.elasticsearch.tasks.TaskResultsService;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportService;
Expand All @@ -165,8 +167,10 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -678,17 +682,37 @@ public Node start() throws NodeValidationException {
: "clusterService has a different local node than the factory provided";
transportService.acceptIncomingRequests();
discovery.startInitialJoin();
final ThreadPool thread = injector.getInstance(ThreadPool.class);
final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);
if (initialStateTimeout.millis() > 0) {
final ThreadPool thread = injector.getInstance(ThreadPool.class);
ClusterState clusterState = clusterService.state();
ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());
if (clusterState.nodes().getMasterNodeId() == null) {
final boolean connectToRemote = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
final boolean waitForState = initialStateTimeout.millis() > 0;
Predicate<ClusterState> connectRemoteClusterPredicate = state -> "_na_".equals(state.metaData().clusterUUID()) == false;
ClusterState clusterState = clusterService.state();
AtomicBoolean connectRemoteClusters = new AtomicBoolean(connectToRemote);
if (waitForState) {
CountDownLatch latch = new CountDownLatch(1);
Predicate<ClusterState> clusterStatePredicate = state -> state.nodes().getMasterNodeId() != null;
final Consumer<ClusterState> consumer;
if (connectToRemote) {
clusterStatePredicate = clusterStatePredicate.and(connectRemoteClusterPredicate);
connectRemoteClusters.set(false);
consumer = c -> transportService.getRemoteClusterService().initializeRemoteClusters(c.metaData().clusterUUID(),
c.metaData().settings(), ActionListener.wrap(v -> latch.countDown(), e -> {
latch.countDown();
logger.warn("Failed to connect to remote clusters", e);
}));
} else {
consumer = c -> latch.countDown();
}
if (clusterStatePredicate.test(clusterState) == false) {
logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
final CountDownLatch latch = new CountDownLatch(1);
ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, initialStateTimeout, logger,
thread.getThreadContext());
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) { latch.countDown(); }
public void onNewClusterState(ClusterState state) {
consumer.accept(state);
}

@Override
public void onClusterServiceClose() {
Expand All @@ -701,13 +725,42 @@ public void onTimeout(TimeValue timeout) {
initialStateTimeout);
latch.countDown();
}
}, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);
}, clusterStatePredicate);
} else {
consumer.accept(clusterState);
}
try {
latch.await();
} catch (InterruptedException e) {
throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
}
}
if (connectRemoteClusters.get()) {
Consumer<ClusterState> consumer = state -> transportService.getRemoteClusterService().initializeRemoteClusters(
state.metaData().clusterUUID(), state.metaData().settings(), ActionListener.wrap(v -> {},
e -> logger.warn("Failed to connect to remote clusters", e)));
if (connectRemoteClusterPredicate.test(clusterState) == false) {
ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger,
thread.getThreadContext());
//
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
consumer.accept(state);
}

try {
latch.await();
} catch (InterruptedException e) {
throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
}
@Override
public void onClusterServiceClose() {
}

@Override
public void onTimeout(TimeValue timeout) {
assert false;
}
}, connectRemoteClusterPredicate);

} else {
consumer.accept(clusterState);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.main.MainAction;
import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.action.main.MainResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -50,6 +54,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -83,11 +88,11 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
private final String clusterAlias;
private final int maxNumRemoteConnections;
private final Predicate<DiscoveryNode> nodePredicate;
private final String localClusterUUID;
private volatile List<DiscoveryNode> seedNodes;
private volatile boolean skipUnavailable;
private final ConnectHandler connectHandler;
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
private final ClusterName localClusterName;
private final SetOnce<ClusterNameAndUUID> remoteClusterAndUUID = new SetOnce<>();

/**
* Creates a new {@link RemoteClusterConnection}
Expand All @@ -99,10 +104,15 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
*/
RemoteClusterConnection(Settings settings, String clusterAlias, List<DiscoveryNode> seedNodes,
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
String localClusterUUID) {
super(settings);
this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.transportService = transportService;
if ("_na_".equals(localClusterUUID)) {
throw new IllegalArgumentException("invalid local clusterstate UUID: " + localClusterUUID);
}
this.localClusterUUID = Objects.requireNonNull(localClusterUUID);

this.maxNumRemoteConnections = maxNumRemoteConnections;
this.nodePredicate = nodePredicate;
this.clusterAlias = clusterAlias;
Expand Down Expand Up @@ -312,15 +322,15 @@ public boolean isClosed() {
return connectHandler.isClosed();
}

private ConnectionProfile getRemoteProfile(ClusterName name) {
private ConnectionProfile getRemoteProfile() {
// we can only compare the cluster name to make a decision if we should use a remote profile
// we can't use a cluster UUID here since we could be connecting to that remote cluster before
// the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a
// rather smallish optimization on the connection layer under certain situations where remote clusters
// have the same name as the local one is minor here.
// the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster,
// gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity
if (this.localClusterName.equals(name)) {
if (remoteClusterAndUUID.get() != null && this.localClusterUUID.equals(remoteClusterAndUUID.get().clusterUUID)) {
return null;
} else {
return remoteProfile;
Expand Down Expand Up @@ -438,6 +448,67 @@ protected void doRun() {
});
}

Transport.Connection findFirstReadyNode(Iterator<DiscoveryNode> seedNodes) throws IOException, InterruptedException {
boolean remoteClusterHasNotFormed = false;
while (seedNodes.hasNext()) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("remote connect thread got interrupted");
}
final DiscoveryNode seedNode = seedNodes.next();
final MainResponse mainResponse;
boolean success = false;
Transport.Connection connection = transportService.openConnection(seedNode,
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
ThreadPool threadPool = transportService.getThreadPool();
ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we stash any context here since this is an internal execution and should not leak any
// existing context information.
threadContext.markAsSystemContext();
PlainTransportFuture<MainResponse> futureHandler = new PlainTransportFuture<>(
new FutureTransportResponseHandler<MainResponse>() {
@Override
public MainResponse read(StreamInput in) throws IOException {
MainResponse response = MainAction.INSTANCE.newResponse();
response.readFrom(in);
return response;
}
});
TransportRequestOptions options = TransportRequestOptions.builder().withTimeout(remoteProfile
.getHandshakeTimeout().millis()).build();
transportService.sendRequest(connection, MainAction.NAME, new MainRequest(), options,
futureHandler);
mainResponse = futureHandler.txGet();
if ("_na_".equals(mainResponse.getClusterUuid()) == false) {
ClusterNameAndUUID clusterNameAndUUID = remoteClusterAndUUID.get();
if (clusterNameAndUUID == null) {
remoteClusterAndUUID.set(new ClusterNameAndUUID(mainResponse.getClusterName(),
mainResponse.getClusterUuid()));
} else if (clusterNameAndUUID.clusterName.equals(mainResponse.getClusterName()) == false) {
throw new IllegalStateException("handshake failed, mismatched cluster name [" + mainResponse.getClusterName()
+ "] - " + seedNode);
} else if (clusterNameAndUUID.clusterUUID.equals(mainResponse.getClusterUuid()) == false) {
throw new IllegalStateException("handshake failed, mismatched cluster UUID [" + mainResponse.getClusterUuid()
+ "] - " + seedNode);
}
success = true;
return connection;
} else {
remoteClusterHasNotFormed = true;
}
} finally {
if (success == false) {
connection.close();
}
}
}
if (remoteClusterHasNotFormed) {
throw new IllegalStateException("seed nodes have not joined a cluster yet");
} else {
throw new IllegalStateException("no seed node left");
}
}

void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
final TransportService transportService, ActionListener<Void> listener) {
if (Thread.currentThread().isInterrupted()) {
Expand All @@ -446,28 +517,23 @@ void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
try {
if (seedNodes.hasNext()) {
cancellableThreads.executeIO(() -> {
final DiscoveryNode seedNode = seedNodes.next();
final TransportService.HandshakeResponse handshakeResponse;
Transport.Connection connection = transportService.openConnection(seedNode,
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
Transport.Connection connection = findFirstReadyNode(seedNodes);
boolean success = false;
try {
try {
handshakeResponse = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
(c) -> remoteClusterName.get() == null ? true : c.equals(remoteClusterName.get()));
(c) -> c.equals(remoteClusterAndUUID.get().clusterName));
} catch (IllegalStateException ex) {
logger.warn(() -> new ParameterizedMessage("seed node {} cluster name mismatch expected " +
"cluster name {}", connection.getNode(), remoteClusterName.get()), ex);
"cluster name {}", connection.getNode(), remoteClusterAndUUID.get()), ex);
throw ex;
}

final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode();
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
transportService.connectToNode(handshakeNode, getRemoteProfile(handshakeResponse.getClusterName()));
if (remoteClusterName.get() == null) {
assert handshakeResponse.getClusterName().value() != null;
remoteClusterName.set(handshakeResponse.getClusterName());
}
transportService.connectToNode(handshakeNode, getRemoteProfile());
assert handshakeResponse.getClusterName().value() != null;
connectedNodes.add(handshakeNode);
}
ClusterStateRequest request = new ClusterStateRequest();
Expand Down Expand Up @@ -557,10 +623,6 @@ public ClusterStateResponse newInstance() {
@Override
public void handleResponse(ClusterStateResponse response) {
try {
if (remoteClusterName.get() == null) {
assert response.getClusterName().value() != null;
remoteClusterName.set(response.getClusterName());
}
try (Closeable theConnection = connection) { // the connection is unused - see comment in #collectRemoteNodes
// we have to close this connection before we notify listeners - this is mainly needed for test correctness
// since if we do it afterwards we might fail assertions that check if all high level connections are closed.
Expand All @@ -573,7 +635,7 @@ public void handleResponse(ClusterStateResponse response) {
for (DiscoveryNode node : nodesIter) {
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
try {
transportService.connectToNode(node, getRemoteProfile(remoteClusterName.get())); // noop if node is
transportService.connectToNode(node, getRemoteProfile()); // noop if node is
// connected
connectedNodes.add(node);
} catch (ConnectTransportException | IllegalStateException ex) {
Expand Down Expand Up @@ -696,4 +758,14 @@ private synchronized void ensureIteratorAvailable() {
}
}
}

private static class ClusterNameAndUUID {
final ClusterName clusterName;
final String clusterUUID;

private ClusterNameAndUUID(ClusterName clusterName, String clusterUUID) {
this.clusterName = clusterName;
this.clusterUUID = clusterUUID;
}
}
}
Loading