-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NETWORKING: Make RemoteClusterConn. Lazy Resolve DNS #32764
Changes from 11 commits
fb9d964
f68901a
11a1f4b
290dab0
774fe5e
d660135
11997c3
2b64312
b3eb225
f2e8f9b
a22994a
ced240b
1114e17
8d10c13
f6b75c3
b86024f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
*/ | ||
package org.elasticsearch.transport; | ||
|
||
import java.util.function.Supplier; | ||
import org.elasticsearch.Version; | ||
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
|
@@ -48,9 +49,19 @@ public abstract class RemoteClusterAware extends AbstractComponent { | |
/** | ||
* A list of initial seed nodes to discover eligible nodes from the remote cluster | ||
*/ | ||
public static final Setting.AffixSetting<List<InetSocketAddress>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting("search.remote.", | ||
"seeds", (key) -> Setting.listSetting(key, Collections.emptyList(), RemoteClusterAware::parseSeedAddress, | ||
Setting.Property.NodeScope, Setting.Property.Dynamic)); | ||
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting( | ||
"search.remote.", | ||
"seeds", | ||
key -> Setting.listSetting( | ||
key, Collections.emptyList(), | ||
s -> { | ||
// validate seed address | ||
parsePort(s); | ||
return s; | ||
}, | ||
Setting.Property.NodeScope, Setting.Property.Dynamic | ||
) | ||
); | ||
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':'; | ||
public static final String LOCAL_CLUSTER_GROUP_KEY = ""; | ||
|
||
|
@@ -65,18 +76,19 @@ protected RemoteClusterAware(Settings settings) { | |
this.clusterNameResolver = new ClusterNameExpressionResolver(settings); | ||
} | ||
|
||
protected static Map<String, List<DiscoveryNode>> buildRemoteClustersSeeds(Settings settings) { | ||
Stream<Setting<List<InetSocketAddress>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings); | ||
protected static Map<String, List<Supplier<DiscoveryNode>>> buildRemoteClustersSeeds(Settings settings) { | ||
Stream<Setting<List<String>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings); | ||
return allConcreteSettings.collect( | ||
Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> { | ||
String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting); | ||
List<DiscoveryNode> nodes = new ArrayList<>(); | ||
for (InetSocketAddress address : concreteSetting.get(settings)) { | ||
TransportAddress transportAddress = new TransportAddress(address); | ||
DiscoveryNode node = new DiscoveryNode(clusterName + "#" + transportAddress.toString(), | ||
transportAddress, | ||
Version.CURRENT.minimumCompatibilityVersion()); | ||
nodes.add(node); | ||
List<Supplier<DiscoveryNode>> nodes = new ArrayList<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can pre-size the list? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure will do :) |
||
for (String address : concreteSetting.get(settings)) { | ||
nodes.add(() -> { | ||
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address)); | ||
return new DiscoveryNode(clusterName + "#" + transportAddress.toString(), | ||
transportAddress, | ||
Version.CURRENT.minimumCompatibilityVersion()); | ||
}); | ||
} | ||
return nodes; | ||
})); | ||
|
@@ -128,7 +140,7 @@ public Map<String, List<String>> groupClusterIndices(String[] requestIndices, Pr | |
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is | ||
* empty the cluster alias is unregistered and should be removed. | ||
*/ | ||
protected abstract void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses); | ||
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses); | ||
|
||
/** | ||
* Registers this instance to listen to updates on the cluster settings. | ||
|
@@ -138,29 +150,37 @@ public void listenForUpdates(ClusterSettings clusterSettings) { | |
(namespace, value) -> {}); | ||
} | ||
|
||
private static InetSocketAddress parseSeedAddress(String remoteHost) { | ||
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300 | ||
if (portSeparator == -1 || portSeparator == remoteHost.length()) { | ||
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead"); | ||
} | ||
String host = remoteHost.substring(0, portSeparator); | ||
protected static InetSocketAddress parseSeedAddress(String remoteHost) { | ||
String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost)); | ||
InetAddress hostAddress; | ||
try { | ||
hostAddress = InetAddress.getByName(host); | ||
} catch (UnknownHostException e) { | ||
throw new IllegalArgumentException("unknown host [" + host + "]", e); | ||
} | ||
return new InetSocketAddress(hostAddress, parsePort(remoteHost)); | ||
} | ||
|
||
private static int parsePort(String remoteHost) { | ||
try { | ||
int port = Integer.valueOf(remoteHost.substring(portSeparator + 1)); | ||
int port = Integer.valueOf(remoteHost.substring(indexOfPortSeparator(remoteHost) + 1)); | ||
if (port <= 0) { | ||
throw new IllegalArgumentException("port number must be > 0 but was: [" + port + "]"); | ||
} | ||
return new InetSocketAddress(hostAddress, port); | ||
return port; | ||
} catch (NumberFormatException e) { | ||
throw new IllegalArgumentException("port must be a number", e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this error message can be misleading. For example, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, I do not see a test for this case. |
||
} | ||
} | ||
|
||
private static int indexOfPortSeparator(String remoteHost) { | ||
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300 | ||
if (portSeparator == -1 || portSeparator == remoteHost.length()) { | ||
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead"); | ||
} | ||
return portSeparator; | ||
} | ||
|
||
public static String buildRemoteIndexName(String clusterAlias, String indexName) { | ||
return clusterAlias != null ? clusterAlias + REMOTE_CLUSTER_INDEX_SEPARATOR + indexName : indexName; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These could probably be on a separate line each, too. 😇
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure will do :)