From 5974a13d35711e4266510e9a2b5c4403a46eec90 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 19 Nov 2019 17:28:07 -0700 Subject: [PATCH 01/10] WIP --- .../common/settings/Setting.java | 14 +++++- .../transport/RemoteConnectionStrategy.java | 48 +++++++++++++++++-- .../transport/SimpleConnectionStrategy.java | 5 +- .../transport/SniffConnectionStrategy.java | 14 ++++-- 4 files changed, 70 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/Setting.java b/server/src/main/java/org/elasticsearch/common/settings/Setting.java index 8848392aade82..17fd3e5071deb 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/server/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -675,6 +675,11 @@ public static class AffixSetting extends Setting { private final Set dependencies; public AffixSetting(AffixKey key, Setting delegate, Function> delegateFactory, AffixSetting... dependencies) { + this(key, delegate, delegateFactory, (s) -> (v) -> {}, dependencies); + } + + public AffixSetting(AffixKey key, Setting delegate, Function> delegateFactory, + Function> validatorFunction, AffixSetting... dependencies) { super(key, delegate.defaultValue, delegate.parser, delegate.properties.toArray(new Property[0])); this.key = key; this.delegateFactory = delegateFactory; @@ -689,6 +694,7 @@ private Stream matchStream(Settings settings) { return settings.keySet().stream().filter(this::match).map(key::getConcreteString); } + @Override public Set> getSettingsDependencies(String settingsKey) { if (dependencies.isEmpty()) { return Collections.emptySet(); @@ -1066,6 +1072,12 @@ public static Setting intSetting(String key, int defaultValue, int minV properties); } + public static Setting intSetting(String key, int defaultValue, int minValue, Validator validator, + Property... properties) { + return new Setting<>(key, Integer.toString(defaultValue), (s) -> parseInt(s, minValue, key, isFiltered(properties)), validator, + properties); + } + public static Setting intSetting(String key, Setting fallbackSetting, int minValue, Property... properties) { return new Setting<>(key, fallbackSetting, (s) -> parseInt(s, minValue, key, isFiltered(properties)), properties); } @@ -1326,7 +1338,7 @@ public static Setting> listSetting( return listSetting(key, fallbackSetting, singleValueParser, defaultStringValue, v -> {}, properties); } - static Setting> listSetting( + public static Setting> listSetting( final String key, final @Nullable Setting> fallbackSetting, final Function singleValueParser, diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 78d831b878bd3..71d51ef4221a9 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -40,13 +40,16 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -58,11 +61,11 @@ enum ConnectionStrategy { SIMPLE(SimpleConnectionStrategy.CHANNELS_PER_CONNECTION, SimpleConnectionStrategy::enablementSettings); private final int numberOfChannels; - private final Supplier>> enabledSettings; + private final Supplier>> enablementSettings; - ConnectionStrategy(int numberOfChannels, Supplier>> enabledSettings) { + ConnectionStrategy(int numberOfChannels, Supplier>> enablementSettings) { this.numberOfChannels = numberOfChannels; - this.enabledSettings = enabledSettings; + this.enablementSettings = enablementSettings; } } @@ -121,7 +124,7 @@ static RemoteConnectionStrategy buildStrategy(String clusterAlias, TransportServ static Set getRemoteClusters(Settings settings) { final Stream> enablementSettings = Arrays.stream(ConnectionStrategy.values()) - .flatMap(strategy -> strategy.enabledSettings.get()); + .flatMap(strategy -> strategy.enablementSettings.get()); return enablementSettings.flatMap(s -> getClusterAlias(settings, s)).collect(Collectors.toSet()); } @@ -319,4 +322,41 @@ private boolean connectionProfileChanged(ConnectionProfile oldProfile, Connectio return Objects.equals(oldProfile.getCompressionEnabled(), newProfile.getCompressionEnabled()) == false || Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false; } + + static class StrategyValidator implements Setting.Validator { + + private final ConnectionStrategy expectedStrategy; + private final Setting settingsKey; + private final Consumer valueChecker; + + StrategyValidator(String namespace, ConnectionStrategy expectedStrategy) { + this(namespace, expectedStrategy, (v) -> {}); + + } + + StrategyValidator(String namespace, ConnectionStrategy expectedStrategy, Consumer valueChecker) { + this.expectedStrategy = expectedStrategy; + this.settingsKey = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(namespace); + this.valueChecker = valueChecker; + } + + @Override + public void validate(T value) { + valueChecker.accept(value); + } + + @Override + public void validate(T value, Map, Object> settings) { + ConnectionStrategy modeType = (ConnectionStrategy) settings.get(settingsKey); + if (modeType.equals(expectedStrategy) == false) { + throw new IllegalArgumentException(""); + } + } + + @Override + public Iterator> settings() { + Stream> settingStream = Stream.of(settingsKey); + return settingStream.iterator(); + } + } } diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java index 97e0d8a36a00c..582146ed34a62 100644 --- a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -55,7 +55,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { // validate address parsePort(s); return s; - }, Setting.Property.Dynamic, Setting.Property.NodeScope)); + }, new StrategyValidator<>(key, ConnectionStrategy.SIMPLE), Setting.Property.Dynamic, Setting.Property.NodeScope)); /** * The maximum number of socket connections that will be established to a remote cluster. The default is 18. @@ -63,7 +63,8 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { public static final Setting.AffixSetting REMOTE_SOCKET_CONNECTIONS = Setting.affixKeySetting( "cluster.remote.", "simple.socket_connections", - key -> intSetting(key, 18, 1, Setting.Property.Dynamic, Setting.Property.NodeScope)); + key -> intSetting(key, 18, 1, new StrategyValidator<>(key, ConnectionStrategy.SNIFF), Setting.Property.Dynamic, + Setting.Property.NodeScope)); static final int CHANNELS_PER_CONNECTION = 1; diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 725c6a6e6fb16..c42cee9649067 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -45,6 +45,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -52,6 +53,7 @@ import java.util.Objects; import java.util.Set; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -75,6 +77,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { parsePort(s); return s; }, + new StrategyValidator<>(key, ConnectionStrategy.SNIFF), Setting.Property.Dynamic, Setting.Property.NodeScope)); @@ -91,7 +94,10 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { // validate seed address parsePort(s); return s; - }, Setting.Property.Dynamic, Setting.Property.NodeScope)); + }, + s -> Collections.emptyList(), + new StrategyValidator<>(key, ConnectionStrategy.SNIFF), + Setting.Property.Dynamic, Setting.Property.NodeScope)); /** * A proxy address for the remote cluster. By default this is not set, meaning that Elasticsearch will connect directly to the nodes in * the remote cluster using their publish addresses. If this setting is set to an IP address or hostname then Elasticsearch will connect @@ -103,11 +109,11 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { "proxy", key -> Setting.simpleString( key, - s -> { + new StrategyValidator<>(key, ConnectionStrategy.SNIFF, s -> { if (Strings.hasLength(s)) { parsePort(s); } - }, + }), Setting.Property.Dynamic, Setting.Property.NodeScope), REMOTE_CLUSTER_SEEDS); @@ -119,7 +125,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { public static final Setting.AffixSetting REMOTE_NODE_CONNECTIONS = Setting.affixKeySetting( "cluster.remote.", "sniff.node_connections", - key -> intSetting(key, RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER, 1, + key -> intSetting(key, RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER, 1, new StrategyValidator<>(key, ConnectionStrategy.SNIFF), Setting.Property.Dynamic, Setting.Property.NodeScope)); static final int CHANNELS_PER_CONNECTION = 6; From 1ff6d8aeb990a7dc86b5a1a081b579d8de08a554 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 19 Nov 2019 18:55:46 -0700 Subject: [PATCH 02/10] WIP --- .../common/settings/Setting.java | 26 ++++--- .../transport/RemoteConnectionStrategy.java | 20 +++-- .../transport/SimpleConnectionStrategy.java | 16 ++-- .../transport/SniffConnectionStrategy.java | 73 ++++++++++--------- .../transport/RemoteClusterServiceTests.java | 2 +- 5 files changed, 75 insertions(+), 62 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/Setting.java b/server/src/main/java/org/elasticsearch/common/settings/Setting.java index 17fd3e5071deb..4366673b2e35d 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/server/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -675,11 +675,6 @@ public static class AffixSetting extends Setting { private final Set dependencies; public AffixSetting(AffixKey key, Setting delegate, Function> delegateFactory, AffixSetting... dependencies) { - this(key, delegate, delegateFactory, (s) -> (v) -> {}, dependencies); - } - - public AffixSetting(AffixKey key, Setting delegate, Function> delegateFactory, - Function> validatorFunction, AffixSetting... dependencies) { super(key, delegate.defaultValue, delegate.parser, delegate.properties.toArray(new Property[0])); this.key = key; this.delegateFactory = delegateFactory; @@ -1329,6 +1324,15 @@ public static Setting> listSetting( return listSetting(key, null, singleValueParser, defaultStringValue, properties); } + public static Setting> listSetting( + final String key, + final Function singleValueParser, + final Function> defaultStringValue, + final Validator> validator, + final Property... properties) { + return listSetting(key, null, singleValueParser, defaultStringValue, validator, properties); + } + public static Setting> listSetting( final String key, final @Nullable Setting> fallbackSetting, @@ -1338,7 +1342,7 @@ public static Setting> listSetting( return listSetting(key, fallbackSetting, singleValueParser, defaultStringValue, v -> {}, properties); } - public static Setting> listSetting( + private static Setting> listSetting( final String key, final @Nullable Setting> fallbackSetting, final Function singleValueParser, @@ -1609,8 +1613,8 @@ public static AffixSetting affixKeySetting(String prefix, String suffix, return affixKeySetting(new AffixKey(prefix, suffix), delegateFactory, dependencies); } - private static AffixSetting affixKeySetting(AffixKey key, Function> delegateFactory, - AffixSetting... dependencies) { + public static AffixSetting affixKeySetting(AffixKey key, Function> delegateFactory, + AffixSetting... dependencies) { Setting delegate = delegateFactory.apply("_na_"); return new AffixSetting<>(key, delegate, delegateFactory, dependencies); } @@ -1687,11 +1691,11 @@ public static final class AffixKey implements Key { private final String prefix; private final String suffix; - AffixKey(String prefix) { + public AffixKey(String prefix) { this(prefix, null); } - AffixKey(String prefix, String suffix) { + public AffixKey(String prefix, String suffix) { assert prefix != null || suffix != null: "Either prefix or suffix must be non-null"; this.prefix = prefix; @@ -1726,7 +1730,7 @@ String getConcreteString(String key) { /** * Returns a string representation of the concrete setting key */ - String getNamespace(String key) { + public String getNamespace(String key) { Matcher matcher = pattern.matcher(key); if (matcher.matches() == false) { throw new IllegalStateException("can't get concrete string for key " + key + " key doesn't match"); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 71d51ef4221a9..eb398ecb2e171 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -326,17 +326,21 @@ private boolean connectionProfileChanged(ConnectionProfile oldProfile, Connectio static class StrategyValidator implements Setting.Validator { private final ConnectionStrategy expectedStrategy; - private final Setting settingsKey; + private final String namespace; private final Consumer valueChecker; - StrategyValidator(String namespace, ConnectionStrategy expectedStrategy) { - this(namespace, expectedStrategy, (v) -> {}); + StrategyValidator(Setting.AffixKey affixKey, String key, ConnectionStrategy expectedStrategy) { + this(affixKey, key, expectedStrategy, (v) -> {}); } - StrategyValidator(String namespace, ConnectionStrategy expectedStrategy, Consumer valueChecker) { + StrategyValidator(Setting.AffixKey affixKey, String key, ConnectionStrategy expectedStrategy, Consumer valueChecker) { this.expectedStrategy = expectedStrategy; - this.settingsKey = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(namespace); + if ("_na_".equals(key)) { + this.namespace = key; + } else { + this.namespace = affixKey.getNamespace(key); + } this.valueChecker = valueChecker; } @@ -347,7 +351,8 @@ public void validate(T value) { @Override public void validate(T value, Map, Object> settings) { - ConnectionStrategy modeType = (ConnectionStrategy) settings.get(settingsKey); + Setting concrete = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(namespace); + ConnectionStrategy modeType = (ConnectionStrategy) settings.get(concrete); if (modeType.equals(expectedStrategy) == false) { throw new IllegalArgumentException(""); } @@ -355,7 +360,8 @@ public void validate(T value, Map, Object> settings) { @Override public Iterator> settings() { - Stream> settingStream = Stream.of(settingsKey); + Setting concrete = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(namespace); + Stream> settingStream = Stream.of(concrete); return settingStream.iterator(); } } diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java index 582146ed34a62..02196813fc753 100644 --- a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -48,23 +48,25 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { * A list of addresses for remote cluster connections. The connections will be opened to the configured addresses in a round-robin * fashion. */ + private static final Setting.AffixKey REMOTE_CLUSTER_ADDRESSES_KEY = new Setting.AffixKey("cluster.remote.", "addresses"); public static final Setting.AffixSetting> REMOTE_CLUSTER_ADDRESSES = Setting.affixKeySetting( - "cluster.remote.", - "addresses", + REMOTE_CLUSTER_ADDRESSES_KEY, key -> Setting.listSetting(key, Collections.emptyList(), s -> { // validate address parsePort(s); return s; - }, new StrategyValidator<>(key, ConnectionStrategy.SIMPLE), Setting.Property.Dynamic, Setting.Property.NodeScope)); + }, new StrategyValidator<>(REMOTE_CLUSTER_ADDRESSES_KEY, key, ConnectionStrategy.SIMPLE), + Setting.Property.Dynamic, Setting.Property.NodeScope)); /** * The maximum number of socket connections that will be established to a remote cluster. The default is 18. */ + private static final Setting.AffixKey REMOTE_SOCKET_CONNECTIONS_KEY = new Setting.AffixKey( + "cluster.remote.", "simple.socket_connections"); public static final Setting.AffixSetting REMOTE_SOCKET_CONNECTIONS = Setting.affixKeySetting( - "cluster.remote.", - "simple.socket_connections", - key -> intSetting(key, 18, 1, new StrategyValidator<>(key, ConnectionStrategy.SNIFF), Setting.Property.Dynamic, - Setting.Property.NodeScope)); + REMOTE_SOCKET_CONNECTIONS_KEY, + key -> intSetting(key, 18, 1, new StrategyValidator<>(REMOTE_SOCKET_CONNECTIONS_KEY, key, ConnectionStrategy.SNIFF), + Setting.Property.Dynamic, Setting.Property.NodeScope)); static final int CHANNELS_PER_CONNECTION = 1; diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index c42cee9649067..2adfc66d117a7 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -45,7 +45,6 @@ import java.io.IOException; import java.net.InetSocketAddress; -import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -53,7 +52,6 @@ import java.util.Objects; import java.util.Set; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -66,66 +64,69 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { /** * A list of initial seed nodes to discover eligible nodes from the remote cluster */ + private static final Setting.AffixKey REMOTE_CLUSTER_SEEDS_OLD_KEY = new Setting.AffixKey("cluster.remote.", "seeds"); public static final Setting.AffixSetting> REMOTE_CLUSTER_SEEDS_OLD = Setting.affixKeySetting( - "cluster.remote.", - "seeds", - key -> Setting.listSetting( - key, - Collections.emptyList(), - s -> { - // validate seed address - parsePort(s); - return s; - }, - new StrategyValidator<>(key, ConnectionStrategy.SNIFF), - Setting.Property.Dynamic, - Setting.Property.NodeScope)); + REMOTE_CLUSTER_SEEDS_OLD_KEY, + key -> Setting.listSetting( + key, + Collections.emptyList(), + s -> { + // validate seed address + parsePort(s); + return s; + }, + new StrategyValidator<>(REMOTE_CLUSTER_SEEDS_OLD_KEY, key, ConnectionStrategy.SNIFF), + Setting.Property.Dynamic, + Setting.Property.NodeScope)); /** * A list of initial seed nodes to discover eligible nodes from the remote cluster */ + private static final Setting.AffixKey REMOTE_CLUSTER_SEEDS_KEY = new Setting.AffixKey("cluster.remote.", "sniff.seeds"); public static final Setting.AffixSetting> REMOTE_CLUSTER_SEEDS = Setting.affixKeySetting( - "cluster.remote.", - "sniff.seeds", + REMOTE_CLUSTER_SEEDS_KEY, key -> Setting.listSetting(key, - "_na_".equals(key) ? REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(key) - : REMOTE_CLUSTER_SEEDS_OLD.getConcreteSetting(key.replaceAll("sniff\\.seeds$", "seeds")), s -> { // validate seed address parsePort(s); return s; }, - s -> Collections.emptyList(), - new StrategyValidator<>(key, ConnectionStrategy.SNIFF), - Setting.Property.Dynamic, Setting.Property.NodeScope)); + s -> "_na_".equals(key) ? REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(key).get(s) : + REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(REMOTE_CLUSTER_SEEDS_KEY.getNamespace(key)).get(s), + new StrategyValidator<>(REMOTE_CLUSTER_SEEDS_KEY, key, ConnectionStrategy.SNIFF), + Setting.Property.Dynamic, + Setting.Property.NodeScope)); + + /** * A proxy address for the remote cluster. By default this is not set, meaning that Elasticsearch will connect directly to the nodes in * the remote cluster using their publish addresses. If this setting is set to an IP address or hostname then Elasticsearch will connect * to the nodes in the remote cluster using this address instead. Use of this setting is not recommended and it is deliberately * undocumented as it does not work well with all proxies. */ + private static final Setting.AffixKey REMOTE_CLUSTERS_PROXY_KEY = new Setting.AffixKey("cluster.remote.", "proxy"); public static final Setting.AffixSetting REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting( - "cluster.remote.", - "proxy", - key -> Setting.simpleString( - key, - new StrategyValidator<>(key, ConnectionStrategy.SNIFF, s -> { - if (Strings.hasLength(s)) { - parsePort(s); - } - }), - Setting.Property.Dynamic, - Setting.Property.NodeScope), + REMOTE_CLUSTERS_PROXY_KEY, + key -> Setting.simpleString( + key, + new StrategyValidator<>(REMOTE_CLUSTERS_PROXY_KEY, key, ConnectionStrategy.SNIFF, s -> { + if (Strings.hasLength(s)) { + parsePort(s); + } + }), + Setting.Property.Dynamic, + Setting.Property.NodeScope), REMOTE_CLUSTER_SEEDS); /** * The maximum number of node connections that will be established to a remote cluster. For instance if there is only a single * seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3. */ + private static final Setting.AffixKey REMOTE_NODE_CONNECTIONS_KEY = new Setting.AffixKey("cluster.remote.", "sniff.node_connections"); public static final Setting.AffixSetting REMOTE_NODE_CONNECTIONS = Setting.affixKeySetting( - "cluster.remote.", - "sniff.node_connections", - key -> intSetting(key, RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER, 1, new StrategyValidator<>(key, ConnectionStrategy.SNIFF), + REMOTE_NODE_CONNECTIONS_KEY, + key -> intSetting(key, RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER, 1, + new StrategyValidator<>(REMOTE_NODE_CONNECTIONS_KEY, key, ConnectionStrategy.SNIFF), Setting.Property.Dynamic, Setting.Property.NodeScope)); static final int CHANNELS_PER_CONNECTION = 6; diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 60e7a848bbb18..f5fa4feab5cee 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -811,7 +811,7 @@ public void testSkipUnavailable() { private static Settings createSettings(String clusterAlias, List seeds) { Settings.Builder builder = Settings.builder(); - if (randomBoolean()) { + if (true) { builder.put(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(clusterAlias).getKey(), Strings.collectionToCommaDelimitedString(seeds)); } else { From 0126003095f456a4861b59a7fca67b22dc63c2f6 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 19 Nov 2019 19:11:03 -0700 Subject: [PATCH 03/10] Fix test --- .../main/java/org/elasticsearch/common/settings/Setting.java | 2 +- .../org/elasticsearch/transport/SniffConnectionStrategy.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/Setting.java b/server/src/main/java/org/elasticsearch/common/settings/Setting.java index 4366673b2e35d..371a2e81fd575 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/server/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -1342,7 +1342,7 @@ public static Setting> listSetting( return listSetting(key, fallbackSetting, singleValueParser, defaultStringValue, v -> {}, properties); } - private static Setting> listSetting( + public static Setting> listSetting( final String key, final @Nullable Setting> fallbackSetting, final Function singleValueParser, diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 2adfc66d117a7..0c2b72ac572fe 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -86,6 +86,8 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { public static final Setting.AffixSetting> REMOTE_CLUSTER_SEEDS = Setting.affixKeySetting( REMOTE_CLUSTER_SEEDS_KEY, key -> Setting.listSetting(key, + "_na_".equals(key) ? REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(key) + : REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(REMOTE_CLUSTER_SEEDS_KEY.getNamespace(key)), s -> { // validate seed address parsePort(s); From c140e6f72ca354a69728f4143070b6c868a07c43 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 20 Nov 2019 08:36:55 -0700 Subject: [PATCH 04/10] Chnages --- .../common/settings/Setting.java | 49 +++++++++++++------ .../transport/RemoteConnectionStrategy.java | 13 ++--- .../transport/SimpleConnectionStrategy.java | 17 +++---- .../transport/SniffConnectionStrategy.java | 44 +++++++++-------- .../transport/RemoteClusterServiceTests.java | 2 +- 5 files changed, 70 insertions(+), 55 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/Setting.java b/server/src/main/java/org/elasticsearch/common/settings/Setting.java index 371a2e81fd575..83e05e4d06b83 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/server/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -53,6 +53,7 @@ import java.util.Objects; import java.util.Set; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.regex.Matcher; @@ -671,10 +672,10 @@ public String toString() { public static class AffixSetting extends Setting { private final AffixKey key; - private final Function> delegateFactory; + private final BiFunction> delegateFactory; private final Set dependencies; - public AffixSetting(AffixKey key, Setting delegate, Function> delegateFactory, AffixSetting... dependencies) { + public AffixSetting(AffixKey key, Setting delegate, BiFunction> delegateFactory, AffixSetting... dependencies) { super(key, delegate.defaultValue, delegate.parser, delegate.properties.toArray(new Property[0])); this.key = key; this.delegateFactory = delegateFactory; @@ -714,7 +715,7 @@ public Map, T> getValue(Settings curren final Map, T> result = new IdentityHashMap<>(); Stream.concat(matchStream(current), matchStream(previous)).distinct().forEach(aKey -> { String namespace = key.getNamespace(aKey); - Setting concreteSetting = getConcreteSetting(aKey); + Setting concreteSetting = getConcreteSetting(namespace, aKey); AbstractScopedSettings.SettingUpdater updater = concreteSetting.newUpdater((v) -> consumer.accept(namespace, v), logger, (v) -> validator.accept(namespace, v)); @@ -752,7 +753,7 @@ public Map getValue(Settings current, Settings previous) { final Map result = new IdentityHashMap<>(); Stream.concat(matchStream(current), matchStream(previous)).distinct().forEach(aKey -> { String namespace = key.getNamespace(aKey); - Setting concreteSetting = getConcreteSetting(aKey); + Setting concreteSetting = getConcreteSetting(namespace, aKey); AbstractScopedSettings.SettingUpdater updater = concreteSetting.newUpdater((v) -> {}, logger, (v) -> validator.accept(namespace, v)); if (updater.hasChanged(current, previous)) { @@ -787,7 +788,16 @@ public String innerGetRaw(final Settings settings) { @Override public Setting getConcreteSetting(String key) { if (match(key)) { - return delegateFactory.apply(key); + String namespace = this.key.getNamespace(key); + return delegateFactory.apply(namespace, key); + } else { + throw new IllegalArgumentException("key [" + key + "] must match [" + getKey() + "] but didn't."); + } + } + + private Setting getConcreteSetting(String namespace, String key) { + if (match(key)) { + return delegateFactory.apply(namespace, key); } else { throw new IllegalArgumentException("key [" + key + "] must match [" + getKey() + "] but didn't."); } @@ -798,7 +808,7 @@ public Setting getConcreteSetting(String key) { */ public Setting getConcreteSettingForNamespace(String namespace) { String fullKey = key.toConcreteKey(namespace).toString(); - return getConcreteSetting(fullKey); + return getConcreteSetting(namespace, fullKey); } @Override @@ -835,8 +845,9 @@ public Set getNamespaces(Settings settings) { public Map getAsMap(Settings settings) { Map map = new HashMap<>(); matchStream(settings).distinct().forEach(key -> { - Setting concreteSetting = getConcreteSetting(key); - map.put(getNamespace(concreteSetting), concreteSetting.get(settings)); + String namespace = this.key.getNamespace(key); + Setting concreteSetting = getConcreteSetting(namespace, key); + map.put(namespace, concreteSetting.get(settings)); }); return Collections.unmodifiableMap(map); } @@ -1600,7 +1611,8 @@ public int hashCode() { * {@link #getConcreteSetting(String)} is used to pull the updater. */ public static AffixSetting prefixKeySetting(String prefix, Function> delegateFactory) { - return affixKeySetting(new AffixKey(prefix), delegateFactory); + BiFunction> delegateFactoryWithNamespace = (ns, k) -> delegateFactory.apply(k); + return affixKeySetting(new AffixKey(prefix), delegateFactoryWithNamespace); } /** @@ -1610,12 +1622,19 @@ public static AffixSetting prefixKeySetting(String prefix, Function AffixSetting affixKeySetting(String prefix, String suffix, Function> delegateFactory, AffixSetting... dependencies) { - return affixKeySetting(new AffixKey(prefix, suffix), delegateFactory, dependencies); + BiFunction> delegateFactoryWithNamespace = (ns, k) -> delegateFactory.apply(k); + return affixKeySetting(new AffixKey(prefix, suffix), delegateFactoryWithNamespace, dependencies); + } + + public static AffixSetting affixKeySetting(String prefix, String suffix, BiFunction> delegateFactory, + AffixSetting... dependencies) { + Setting delegate = delegateFactory.apply("_na_", "_na_"); + return new AffixSetting<>(new AffixKey(prefix, suffix), delegate, delegateFactory, dependencies); } - public static AffixSetting affixKeySetting(AffixKey key, Function> delegateFactory, + private static AffixSetting affixKeySetting(AffixKey key, BiFunction> delegateFactory, AffixSetting... dependencies) { - Setting delegate = delegateFactory.apply("_na_"); + Setting delegate = delegateFactory.apply("_na_", "_na_"); return new AffixSetting<>(key, delegate, delegateFactory, dependencies); } @@ -1691,11 +1710,11 @@ public static final class AffixKey implements Key { private final String prefix; private final String suffix; - public AffixKey(String prefix) { + AffixKey(String prefix) { this(prefix, null); } - public AffixKey(String prefix, String suffix) { + AffixKey(String prefix, String suffix) { assert prefix != null || suffix != null: "Either prefix or suffix must be non-null"; this.prefix = prefix; @@ -1730,7 +1749,7 @@ String getConcreteString(String key) { /** * Returns a string representation of the concrete setting key */ - public String getNamespace(String key) { + String getNamespace(String key) { Matcher matcher = pattern.matcher(key); if (matcher.matches() == false) { throw new IllegalStateException("can't get concrete string for key " + key + " key doesn't match"); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index eb398ecb2e171..31a58531cafcf 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -329,18 +329,13 @@ static class StrategyValidator implements Setting.Validator { private final String namespace; private final Consumer valueChecker; - StrategyValidator(Setting.AffixKey affixKey, String key, ConnectionStrategy expectedStrategy) { - this(affixKey, key, expectedStrategy, (v) -> {}); - + StrategyValidator(String namespace, ConnectionStrategy expectedStrategy) { + this(namespace, expectedStrategy, (v) -> {}); } - StrategyValidator(Setting.AffixKey affixKey, String key, ConnectionStrategy expectedStrategy, Consumer valueChecker) { + StrategyValidator(String namespace, ConnectionStrategy expectedStrategy, Consumer valueChecker) { this.expectedStrategy = expectedStrategy; - if ("_na_".equals(key)) { - this.namespace = key; - } else { - this.namespace = affixKey.getNamespace(key); - } + this.namespace = namespace; this.valueChecker = valueChecker; } diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java index 02196813fc753..1de5f2334c50e 100644 --- a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -48,24 +48,23 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { * A list of addresses for remote cluster connections. The connections will be opened to the configured addresses in a round-robin * fashion. */ - private static final Setting.AffixKey REMOTE_CLUSTER_ADDRESSES_KEY = new Setting.AffixKey("cluster.remote.", "addresses"); public static final Setting.AffixSetting> REMOTE_CLUSTER_ADDRESSES = Setting.affixKeySetting( - REMOTE_CLUSTER_ADDRESSES_KEY, - key -> Setting.listSetting(key, Collections.emptyList(), s -> { + "cluster.remote.", + "addresses", + (ns, key) -> Setting.listSetting(key, Collections.emptyList(), s -> { // validate address parsePort(s); return s; - }, new StrategyValidator<>(REMOTE_CLUSTER_ADDRESSES_KEY, key, ConnectionStrategy.SIMPLE), + }, new StrategyValidator<>(ns, ConnectionStrategy.SIMPLE), Setting.Property.Dynamic, Setting.Property.NodeScope)); /** * The maximum number of socket connections that will be established to a remote cluster. The default is 18. */ - private static final Setting.AffixKey REMOTE_SOCKET_CONNECTIONS_KEY = new Setting.AffixKey( - "cluster.remote.", "simple.socket_connections"); public static final Setting.AffixSetting REMOTE_SOCKET_CONNECTIONS = Setting.affixKeySetting( - REMOTE_SOCKET_CONNECTIONS_KEY, - key -> intSetting(key, 18, 1, new StrategyValidator<>(REMOTE_SOCKET_CONNECTIONS_KEY, key, ConnectionStrategy.SNIFF), + "cluster.remote.", + "simple.socket_connections", + (ns, key) -> intSetting(key, 18, 1, new StrategyValidator<>(ns, ConnectionStrategy.SNIFF), Setting.Property.Dynamic, Setting.Property.NodeScope)); static final int CHANNELS_PER_CONNECTION = 1; @@ -81,7 +80,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { private final ConnectionManager.ConnectionValidator clusterNameValidator; SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, - Settings settings) { + Settings settings) { this( clusterAlias, transportService, diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 0c2b72ac572fe..61aa8189ec391 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -64,10 +64,10 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { /** * A list of initial seed nodes to discover eligible nodes from the remote cluster */ - private static final Setting.AffixKey REMOTE_CLUSTER_SEEDS_OLD_KEY = new Setting.AffixKey("cluster.remote.", "seeds"); public static final Setting.AffixSetting> REMOTE_CLUSTER_SEEDS_OLD = Setting.affixKeySetting( - REMOTE_CLUSTER_SEEDS_OLD_KEY, - key -> Setting.listSetting( + "cluster.remote.", + "seeds", + (ns, key) -> Setting.listSetting( key, Collections.emptyList(), s -> { @@ -75,27 +75,25 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { parsePort(s); return s; }, - new StrategyValidator<>(REMOTE_CLUSTER_SEEDS_OLD_KEY, key, ConnectionStrategy.SNIFF), + new StrategyValidator<>(ns, ConnectionStrategy.SNIFF), Setting.Property.Dynamic, Setting.Property.NodeScope)); /** * A list of initial seed nodes to discover eligible nodes from the remote cluster */ - private static final Setting.AffixKey REMOTE_CLUSTER_SEEDS_KEY = new Setting.AffixKey("cluster.remote.", "sniff.seeds"); public static final Setting.AffixSetting> REMOTE_CLUSTER_SEEDS = Setting.affixKeySetting( - REMOTE_CLUSTER_SEEDS_KEY, - key -> Setting.listSetting(key, - "_na_".equals(key) ? REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(key) - : REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(REMOTE_CLUSTER_SEEDS_KEY.getNamespace(key)), + "cluster.remote.", + "sniff.seeds", + (ns, key) -> Setting.listSetting(key, + REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(ns), s -> { // validate seed address parsePort(s); return s; }, - s -> "_na_".equals(key) ? REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(key).get(s) : - REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(REMOTE_CLUSTER_SEEDS_KEY.getNamespace(key)).get(s), - new StrategyValidator<>(REMOTE_CLUSTER_SEEDS_KEY, key, ConnectionStrategy.SNIFF), + s -> REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(ns).get(s), + new StrategyValidator<>(ns, ConnectionStrategy.SNIFF), Setting.Property.Dynamic, Setting.Property.NodeScope)); @@ -106,12 +104,12 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { * to the nodes in the remote cluster using this address instead. Use of this setting is not recommended and it is deliberately * undocumented as it does not work well with all proxies. */ - private static final Setting.AffixKey REMOTE_CLUSTERS_PROXY_KEY = new Setting.AffixKey("cluster.remote.", "proxy"); public static final Setting.AffixSetting REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting( - REMOTE_CLUSTERS_PROXY_KEY, - key -> Setting.simpleString( + "cluster.remote.", + "proxy", + (ns, key) -> Setting.simpleString( key, - new StrategyValidator<>(REMOTE_CLUSTERS_PROXY_KEY, key, ConnectionStrategy.SNIFF, s -> { + new StrategyValidator<>(ns, ConnectionStrategy.SNIFF, s -> { if (Strings.hasLength(s)) { parsePort(s); } @@ -124,12 +122,16 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { * The maximum number of node connections that will be established to a remote cluster. For instance if there is only a single * seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3. */ - private static final Setting.AffixKey REMOTE_NODE_CONNECTIONS_KEY = new Setting.AffixKey("cluster.remote.", "sniff.node_connections"); public static final Setting.AffixSetting REMOTE_NODE_CONNECTIONS = Setting.affixKeySetting( - REMOTE_NODE_CONNECTIONS_KEY, - key -> intSetting(key, RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER, 1, - new StrategyValidator<>(REMOTE_NODE_CONNECTIONS_KEY, key, ConnectionStrategy.SNIFF), - Setting.Property.Dynamic, Setting.Property.NodeScope)); + "cluster.remote.", + "sniff.node_connections", + (ns, key) -> intSetting( + key, + RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER, + 1, + new StrategyValidator<>(ns, ConnectionStrategy.SNIFF), + Setting.Property.Dynamic, + Setting.Property.NodeScope)); static final int CHANNELS_PER_CONNECTION = 6; diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index f5fa4feab5cee..60e7a848bbb18 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -811,7 +811,7 @@ public void testSkipUnavailable() { private static Settings createSettings(String clusterAlias, List seeds) { Settings.Builder builder = Settings.builder(); - if (true) { + if (randomBoolean()) { builder.put(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(clusterAlias).getKey(), Strings.collectionToCommaDelimitedString(seeds)); } else { From d64c97c275e863c7ecb77dc8a57ecb3cfa2f9b26 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 20 Nov 2019 08:43:01 -0700 Subject: [PATCH 05/10] Register settings --- .../org/elasticsearch/common/settings/ClusterSettings.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index c2aeeac3f49d9..9bd14e0991601 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -102,6 +102,8 @@ import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.RemoteConnectionStrategy; +import org.elasticsearch.transport.SimpleConnectionStrategy; import org.elasticsearch.transport.SniffConnectionStrategy; import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.watcher.ResourceWatcherService; @@ -287,6 +289,9 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteClusterService.ENABLE_REMOTE_CLUSTERS, RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE, RemoteClusterService.REMOTE_CLUSTER_COMPRESS, + RemoteConnectionStrategy.REMOTE_CONNECTION_MODE, + SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, + SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD, SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, From a8d1844a72ea212337a9f0118d53cfaccf87577d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 20 Nov 2019 13:39:24 -0700 Subject: [PATCH 06/10] Changes --- .../common/settings/ClusterSettings.java | 2 +- .../common/settings/Setting.java | 5 +-- .../transport/RemoteClusterConnection.java | 2 +- .../transport/RemoteClusterService.java | 11 ------ .../transport/RemoteConnectionStrategy.java | 18 +++++++--- .../transport/SimpleConnectionStrategy.java | 4 +-- .../transport/SniffConnectionStrategy.java | 20 ++++++++--- .../RemoteClusterConnectionTests.java | 2 +- .../transport/RemoteClusterServiceTests.java | 7 ++-- .../transport/RemoteClusterSettingsTests.java | 2 +- .../RemoteConnectionStrategyTests.java | 16 ++++----- .../SimpleConnectionStrategyTests.java | 36 +++++++++++++++++++ .../SniffConnectionStrategyTests.java | 35 ++++++++++++++++++ .../core/security/authc/RealmSettings.java | 2 +- 14 files changed, 122 insertions(+), 40 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 9bd14e0991601..f51dfb62e0593 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -283,7 +283,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE, - RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER, + SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER, RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING, RemoteClusterService.REMOTE_NODE_ATTRIBUTE, RemoteClusterService.ENABLE_REMOTE_CLUSTERS, diff --git a/server/src/main/java/org/elasticsearch/common/settings/Setting.java b/server/src/main/java/org/elasticsearch/common/settings/Setting.java index 83e05e4d06b83..2a6b2f10999ff 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/server/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -675,7 +675,8 @@ public static class AffixSetting extends Setting { private final BiFunction> delegateFactory; private final Set dependencies; - public AffixSetting(AffixKey key, Setting delegate, BiFunction> delegateFactory, AffixSetting... dependencies) { + public AffixSetting(AffixKey key, Setting delegate, BiFunction> delegateFactory, + AffixSetting... dependencies) { super(key, delegate.defaultValue, delegate.parser, delegate.properties.toArray(new Property[0])); this.key = key; this.delegateFactory = delegateFactory; @@ -1633,7 +1634,7 @@ public static AffixSetting affixKeySetting(String prefix, String suffix, } private static AffixSetting affixKeySetting(AffixKey key, BiFunction> delegateFactory, - AffixSetting... dependencies) { + AffixSetting... dependencies) { Setting delegate = delegateFactory.apply("_na_", "_na_"); return new AffixSetting<>(key, delegate, delegateFactory, dependencies); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 16f13e57e7223..2ab24ee089903 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -47,7 +47,7 @@ * in the remote cluster and connects to all eligible nodes, for details see {@link RemoteClusterService#REMOTE_NODE_ATTRIBUTE}. * * In the case of a disconnection, this class will issue a re-connect task to establish at most - * {@link RemoteClusterService#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of + * {@link SniffConnectionStrategy#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of * connections per cluster has been reached. */ final class RemoteClusterConnection implements Closeable { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 3a1ab055a05cf..ba1084de80571 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -64,17 +64,6 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl private static final ActionListener noopListener = ActionListener.wrap((x) -> {}, (x) -> {}); - /** - * The maximum number of connections that will be established to a remote cluster. For instance if there is only a single - * seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3. - */ - public static final Setting REMOTE_CONNECTIONS_PER_CLUSTER = - Setting.intSetting( - "cluster.remote.connections_per_cluster", - 3, - 1, - Setting.Property.NodeScope); - /** * The initial connect timeout for remote cluster connections */ diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 31a58531cafcf..4d11aebeefab5 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -67,6 +67,10 @@ enum ConnectionStrategy { this.numberOfChannels = numberOfChannels; this.enablementSettings = enablementSettings; } + + public int getNumberOfChannels() { + return numberOfChannels; + } } public static final Setting.AffixSetting REMOTE_CONNECTION_MODE = Setting.affixKeySetting( @@ -74,6 +78,7 @@ enum ConnectionStrategy { key, ConnectionStrategy.SNIFF.name(), value -> ConnectionStrategy.valueOf(value.toUpperCase(Locale.ROOT)), + Setting.Property.NodeScope, Setting.Property.Dynamic)); @@ -325,17 +330,19 @@ private boolean connectionProfileChanged(ConnectionProfile oldProfile, Connectio static class StrategyValidator implements Setting.Validator { + private final String key; private final ConnectionStrategy expectedStrategy; private final String namespace; private final Consumer valueChecker; - StrategyValidator(String namespace, ConnectionStrategy expectedStrategy) { - this(namespace, expectedStrategy, (v) -> {}); + StrategyValidator(String namespace, String key, ConnectionStrategy expectedStrategy) { + this(namespace, key, expectedStrategy, (v) -> {}); } - StrategyValidator(String namespace, ConnectionStrategy expectedStrategy, Consumer valueChecker) { - this.expectedStrategy = expectedStrategy; + StrategyValidator(String namespace, String key, ConnectionStrategy expectedStrategy, Consumer valueChecker) { this.namespace = namespace; + this.key = key; + this.expectedStrategy = expectedStrategy; this.valueChecker = valueChecker; } @@ -349,7 +356,8 @@ public void validate(T value, Map, Object> settings) { Setting concrete = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(namespace); ConnectionStrategy modeType = (ConnectionStrategy) settings.get(concrete); if (modeType.equals(expectedStrategy) == false) { - throw new IllegalArgumentException(""); + throw new IllegalArgumentException("Setting \"" + key + "\" cannot be used with the configured \"" + concrete.getKey() + + "\" [required=" + expectedStrategy.name() + ", configured=" + modeType.name() + "]"); } } diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java index 1de5f2334c50e..0a50e77f6c334 100644 --- a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -55,7 +55,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { // validate address parsePort(s); return s; - }, new StrategyValidator<>(ns, ConnectionStrategy.SIMPLE), + }, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE), Setting.Property.Dynamic, Setting.Property.NodeScope)); /** @@ -64,7 +64,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { public static final Setting.AffixSetting REMOTE_SOCKET_CONNECTIONS = Setting.affixKeySetting( "cluster.remote.", "simple.socket_connections", - (ns, key) -> intSetting(key, 18, 1, new StrategyValidator<>(ns, ConnectionStrategy.SNIFF), + (ns, key) -> intSetting(key, 18, 1, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE), Setting.Property.Dynamic, Setting.Property.NodeScope)); static final int CHANNELS_PER_CONNECTION = 1; diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 61aa8189ec391..ee56629ebf0aa 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -75,7 +75,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { parsePort(s); return s; }, - new StrategyValidator<>(ns, ConnectionStrategy.SNIFF), + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF), Setting.Property.Dynamic, Setting.Property.NodeScope)); @@ -93,7 +93,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { return s; }, s -> REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(ns).get(s), - new StrategyValidator<>(ns, ConnectionStrategy.SNIFF), + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF), Setting.Property.Dynamic, Setting.Property.NodeScope)); @@ -109,7 +109,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { "proxy", (ns, key) -> Setting.simpleString( key, - new StrategyValidator<>(ns, ConnectionStrategy.SNIFF, s -> { + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF, s -> { if (Strings.hasLength(s)) { parsePort(s); } @@ -118,6 +118,16 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { Setting.Property.NodeScope), REMOTE_CLUSTER_SEEDS); + /** + * The maximum number of connections that will be established to a remote cluster. For instance if there is only a single + * seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3. + */ + public static final Setting REMOTE_CONNECTIONS_PER_CLUSTER = + intSetting( + "cluster.remote.connections_per_cluster", + 3, + 1, + Setting.Property.NodeScope); /** * The maximum number of node connections that will be established to a remote cluster. For instance if there is only a single * seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3. @@ -127,9 +137,9 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { "sniff.node_connections", (ns, key) -> intSetting( key, - RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER, + REMOTE_CONNECTIONS_PER_CLUSTER, 1, - new StrategyValidator<>(ns, ConnectionStrategy.SNIFF), + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF), Setting.Property.Dynamic, Setting.Property.NodeScope)); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index dd229e8f4fec5..ddded559512f7 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -330,7 +330,7 @@ public void testGetConnectionInfo() throws Exception { int maxNumConnections = randomIntBetween(1, 5); String clusterAlias = "test-cluster"; Settings settings = Settings.builder().put(buildSniffSettings(clusterAlias, seedNodes)) - .put(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER.getKey(), maxNumConnections).build(); + .put(SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER.getKey(), maxNumConnections).build(); try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) { // test no nodes connected RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 60e7a848bbb18..0fdc797b8b377 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -76,15 +76,18 @@ private MockTransportService startTransport( } public void testSettingsAreRegistered() { - assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)); - assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_NODE_ATTRIBUTE)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_COMPRESS)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS)); } public void testRemoteClusterSeedSetting() { diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java index 80206eaf2b21d..af855314278f7 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java @@ -29,7 +29,7 @@ import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD; import static org.elasticsearch.transport.RemoteClusterService.ENABLE_REMOTE_CLUSTERS; import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE; -import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER; +import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER; import static org.elasticsearch.transport.RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING; import static org.elasticsearch.transport.RemoteClusterService.REMOTE_NODE_ATTRIBUTE; import static org.hamcrest.Matchers.emptyCollectionOf; diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index 3bae8b4c9559b..5ea54c7356b94 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -72,14 +72,14 @@ public void testChangeInConnectionProfileMeansTheStrategyMustBeRebuilt() { public void testCorrectChannelNumber() { String clusterAlias = "cluster-alias"; - String settingKey = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(); - Settings simpleSettings = Settings.builder().put(settingKey, "simple").build(); - ConnectionProfile simpleProfile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, simpleSettings); - assertEquals(1, simpleProfile.getNumConnections()); - - Settings sniffSettings = Settings.builder().put(settingKey, "sniff").build(); - ConnectionProfile sniffProfile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, sniffSettings); - assertEquals(6, sniffProfile.getNumConnections()); + + for (RemoteConnectionStrategy.ConnectionStrategy strategy : RemoteConnectionStrategy.ConnectionStrategy.values()) { + String settingKey = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(); + Settings simpleSettings = Settings.builder().put(settingKey, strategy.name()).build(); + ConnectionProfile simpleProfile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, simpleSettings); + assertEquals("Incorrect number of channels for " + strategy.name(), + strategy.getNumberOfChannels(), simpleProfile.getNumConnections()); + } } private static class FakeConnectionStrategy extends RemoteConnectionStrategy { diff --git a/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java index 95297bf33e931..35a6b7a6758ac 100644 --- a/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java @@ -22,6 +22,10 @@ import org.elasticsearch.Version; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.AbstractScopedSettings; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; @@ -32,7 +36,9 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -297,6 +303,36 @@ numOfConnections, addresses(address), Collections.singletonList(addressSupplier } } + public void testModeSettingsCannotBeUsedWhenInDifferentMode() { + List, String>> restrictedSettings = Arrays.asList( + new Tuple<>(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, "192.168.0.1:8080"), + new Tuple<>(SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, "3")); + + RemoteConnectionStrategy.ConnectionStrategy sniff = RemoteConnectionStrategy.ConnectionStrategy.SNIFF; + + String clusterName = "cluster_name"; + Settings settings = Settings.builder() + .put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterName).getKey(), sniff.name()) + .build(); + + Set> clusterSettings = new HashSet<>(); + clusterSettings.add(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE); + clusterSettings.addAll(restrictedSettings.stream().map(Tuple::v1).collect(Collectors.toList())); + AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, clusterSettings); + + // Should validate successfully + service.validate(settings, true); + + for (Tuple, String> restrictedSetting : restrictedSettings) { + Setting concreteSetting = restrictedSetting.v1().getConcreteSettingForNamespace(clusterName); + Settings invalid = Settings.builder().put(settings).put(concreteSetting.getKey(), restrictedSetting.v2()).build(); + IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> service.validate(invalid, true)); + String expected = "Setting \"" + concreteSetting.getKey() + "\" cannot be used with the configured " + + "\"cluster.remote.cluster_name.mode\" [required=SIMPLE, configured=SNIFF]"; + assertEquals(expected, iae.getMessage()); + } + } + private static List addresses(final TransportAddress... addresses) { return Arrays.stream(addresses).map(TransportAddress::toString).collect(Collectors.toList()); } diff --git a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java index 758b5dca101e5..721055a9c20f7 100644 --- a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java @@ -30,6 +30,9 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.AbstractScopedSettings; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -42,6 +45,7 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; @@ -637,6 +641,37 @@ public void testGetNodePredicatesCombination() { } } + public void testModeSettingsCannotBeUsedWhenInDifferentMode() { + List, String>> restrictedSettings = Arrays.asList( + new Tuple<>(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, "192.168.0.1:8080"), + new Tuple<>(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD, "192.168.0.1:8080"), + new Tuple<>(SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS, "2")); + + RemoteConnectionStrategy.ConnectionStrategy simple = RemoteConnectionStrategy.ConnectionStrategy.SIMPLE; + + String clusterName = "cluster_name"; + Settings settings = Settings.builder() + .put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterName).getKey(), simple.name()) + .build(); + + Set> clusterSettings = new HashSet<>(); + clusterSettings.add(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE); + clusterSettings.addAll(restrictedSettings.stream().map(Tuple::v1).collect(Collectors.toList())); + AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, clusterSettings); + + // Should validate successfully + service.validate(settings, true); + + for (Tuple, String> restrictedSetting : restrictedSettings) { + Setting concreteSetting = restrictedSetting.v1().getConcreteSettingForNamespace(clusterName); + Settings invalid = Settings.builder().put(settings).put(concreteSetting.getKey(), restrictedSetting.v2()).build(); + IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> service.validate(invalid, true)); + String expected = "Setting \"" + concreteSetting.getKey() + "\" cannot be used with the configured " + + "\"cluster.remote.cluster_name.mode\" [required=SNIFF, configured=SIMPLE]"; + assertEquals(expected, iae.getMessage()); + } + } + private static List seedNodes(final DiscoveryNode... seedNodes) { return Arrays.stream(seedNodes).map(s -> s.getAddress().toString()).collect(Collectors.toList()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/RealmSettings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/RealmSettings.java index 6ab511df90eed..fda2cf614c8bd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/RealmSettings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/RealmSettings.java @@ -74,7 +74,7 @@ public static Setting.AffixSetting secureString(String realmType, * The {@code Function} takes the realm-type as an argument. * @param suffix The suffix of the setting (everything following the realm name in the affix setting) * @param delegateFactory A factory to produce the concrete setting. - * See {@link Setting#affixKeySetting(Setting.AffixKey, Function, Setting.AffixSetting[])} + * See {@link Setting#affixKeySetting(String, String, Function, Setting.AffixSetting[])} */ public static Function> affixSetting(String suffix, Function> delegateFactory) { return realmType -> Setting.affixKeySetting(realmSettingPrefix(realmType), suffix, delegateFactory); From 25ebe5f803b993e3c8da22c61d13a45b7934d884 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 20 Nov 2019 15:11:02 -0700 Subject: [PATCH 07/10] Changes --- .../100_simple_connection_mode.yml | 33 +++++++++++++++++++ .../transport/RemoteClusterAware.java | 13 ++++++-- .../transport/RemoteConnectionStrategy.java | 10 +++--- .../transport/SimpleConnectionStrategy.java | 6 ++-- .../transport/SniffConnectionStrategy.java | 13 +++++--- .../70_simple_connection_mode.yml | 33 +++++++++++++++++++ 6 files changed, 93 insertions(+), 15 deletions(-) create mode 100644 qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_simple_connection_mode.yml create mode 100644 x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_simple_connection_mode.yml diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_simple_connection_mode.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_simple_connection_mode.yml new file mode 100644 index 0000000000000..50334f399fc43 --- /dev/null +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_simple_connection_mode.yml @@ -0,0 +1,33 @@ +--- +"Add transient remote cluster using simple connection mode": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.simple.socket_connections: "3" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.socket_connections: "3"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index 2212525411101..be1ca9a1a2c43 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -102,9 +102,16 @@ void validateAndUpdateRemoteCluster(String clusterAlias, Settings settings) { * Registers this instance to listen to updates on the cluster settings. */ public void listenForUpdates(ClusterSettings clusterSettings) { - List> remoteClusterSettings = Arrays.asList(SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY, - SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD, RemoteClusterService.REMOTE_CLUSTER_COMPRESS, - RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS); + List> remoteClusterSettings = Arrays.asList( + RemoteClusterService.REMOTE_CLUSTER_COMPRESS, + RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE, + RemoteConnectionStrategy.REMOTE_CONNECTION_MODE, + SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY, + SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, + SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD, + SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS, + SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, + SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS); clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 4d11aebeefab5..2027e7f60037d 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -333,16 +333,18 @@ static class StrategyValidator implements Setting.Validator { private final String key; private final ConnectionStrategy expectedStrategy; private final String namespace; + private final T disabledValue; private final Consumer valueChecker; - StrategyValidator(String namespace, String key, ConnectionStrategy expectedStrategy) { - this(namespace, key, expectedStrategy, (v) -> {}); + StrategyValidator(String namespace, String key, ConnectionStrategy expectedStrategy, T disabledValue) { + this(namespace, key, expectedStrategy, disabledValue, (v) -> {}); } - StrategyValidator(String namespace, String key, ConnectionStrategy expectedStrategy, Consumer valueChecker) { + StrategyValidator(String namespace, String key, ConnectionStrategy expectedStrategy, T disabledValue, Consumer valueChecker) { this.namespace = namespace; this.key = key; this.expectedStrategy = expectedStrategy; + this.disabledValue = disabledValue; this.valueChecker = valueChecker; } @@ -355,7 +357,7 @@ public void validate(T value) { public void validate(T value, Map, Object> settings) { Setting concrete = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(namespace); ConnectionStrategy modeType = (ConnectionStrategy) settings.get(concrete); - if (modeType.equals(expectedStrategy) == false) { + if (value != null && value.equals(disabledValue) == false && modeType.equals(expectedStrategy) == false) { throw new IllegalArgumentException("Setting \"" + key + "\" cannot be used with the configured \"" + concrete.getKey() + "\" [required=" + expectedStrategy.name() + ", configured=" + modeType.name() + "]"); } diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java index 0a50e77f6c334..6a1129a41169c 100644 --- a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -50,12 +50,12 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { */ public static final Setting.AffixSetting> REMOTE_CLUSTER_ADDRESSES = Setting.affixKeySetting( "cluster.remote.", - "addresses", + "simple.addresses", (ns, key) -> Setting.listSetting(key, Collections.emptyList(), s -> { // validate address parsePort(s); return s; - }, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE), + }, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE, Collections.emptyList()), Setting.Property.Dynamic, Setting.Property.NodeScope)); /** @@ -64,7 +64,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { public static final Setting.AffixSetting REMOTE_SOCKET_CONNECTIONS = Setting.affixKeySetting( "cluster.remote.", "simple.socket_connections", - (ns, key) -> intSetting(key, 18, 1, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE), + (ns, key) -> intSetting(key, 18, 1, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE, 18), Setting.Property.Dynamic, Setting.Property.NodeScope)); static final int CHANNELS_PER_CONNECTION = 1; diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index ee56629ebf0aa..d23d47a0d6f0f 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -75,7 +75,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { parsePort(s); return s; }, - new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF), + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF, Collections.emptyList()), Setting.Property.Dynamic, Setting.Property.NodeScope)); @@ -93,7 +93,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { return s; }, s -> REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(ns).get(s), - new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF), + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF, Collections.emptyList()), Setting.Property.Dynamic, Setting.Property.NodeScope)); @@ -109,7 +109,8 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { "proxy", (ns, key) -> Setting.simpleString( key, - new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF, s -> { + "", + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF, "", s -> { if (Strings.hasLength(s)) { parsePort(s); } @@ -118,6 +119,8 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { Setting.Property.NodeScope), REMOTE_CLUSTER_SEEDS); + private static final int DEFAULT_CONNECTIONS_PER_CLUSTER = 3; + /** * The maximum number of connections that will be established to a remote cluster. For instance if there is only a single * seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3. @@ -125,7 +128,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { public static final Setting REMOTE_CONNECTIONS_PER_CLUSTER = intSetting( "cluster.remote.connections_per_cluster", - 3, + DEFAULT_CONNECTIONS_PER_CLUSTER, 1, Setting.Property.NodeScope); /** @@ -139,7 +142,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { key, REMOTE_CONNECTIONS_PER_CLUSTER, 1, - new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF), + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF, DEFAULT_CONNECTIONS_PER_CLUSTER), Setting.Property.Dynamic, Setting.Property.NodeScope)); diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_simple_connection_mode.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_simple_connection_mode.yml new file mode 100644 index 0000000000000..50334f399fc43 --- /dev/null +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_simple_connection_mode.yml @@ -0,0 +1,33 @@ +--- +"Add transient remote cluster using simple connection mode": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.simple.socket_connections: "3" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.socket_connections: "3"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } From 05570756b5dddaa356fab697d9386b915ad4abfe Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 20 Nov 2019 17:50:36 -0700 Subject: [PATCH 08/10] Chnages --- .../common/settings/Setting.java | 19 ++++++++++++++++--- .../transport/RemoteConnectionStrategy.java | 12 +++++------- .../transport/SimpleConnectionStrategy.java | 4 ++-- .../transport/SniffConnectionStrategy.java | 13 +++++-------- 4 files changed, 28 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/Setting.java b/server/src/main/java/org/elasticsearch/common/settings/Setting.java index 2a6b2f10999ff..2e6bfdc635cd2 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/server/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -446,6 +446,7 @@ private T get(Settings settings, boolean validate) { } validator.validate(parsed); validator.validate(parsed, map); + validator.validate(parsed, map, exists(settings)); } return parsed; } catch (ElasticsearchParseException ex) { @@ -856,9 +857,9 @@ public Map getAsMap(Settings settings) { /** * Represents a validator for a setting. The {@link #validate(Object)} method is invoked early in the update setting process with the - * value of this setting for a fail-fast validation. Later on, the {@link #validate(Object, Map)} method is invoked with the value of - * this setting and a map from the settings specified by {@link #settings()}} to their values. All these values come from the same - * {@link Settings} instance. + * value of this setting for a fail-fast validation. Later on, the {@link #validate(Object, Map)} and + * {@link #validate(Object, Map, boolean)} methods are invoked with the value of this setting and a map from the settings specified by + * {@link #settings()}} to their values. All these values come from the same {@link Settings} instance. * * @param the type of the {@link Setting} */ @@ -882,6 +883,18 @@ public interface Validator { default void validate(T value, Map, Object> settings) { } + /** + * Validate this setting against its dependencies, specified by {@link #settings()}. This method allows validation logic + * to evaluate whether the setting will be present in the {@link Settings} after the update. The default implementation + * does nothing, accepting any value as valid as long as it passes the validation in {@link #validate(Object)}. + * + * @param value the value of this setting + * @param settings a map from the settings specified by {@link #settings()}} to their values + * @param isPresent boolean indicating if this setting is present + */ + default void validate(T value, Map, Object> settings, boolean isPresent) { + } + /** * The settings on which the validity of this setting depends. The values of the specified settings are passed to * {@link #validate(Object, Map)}. By default this returns an empty iterator, indicating that this setting does not depend on any diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 2027e7f60037d..d8a459a79a56e 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -333,18 +333,16 @@ static class StrategyValidator implements Setting.Validator { private final String key; private final ConnectionStrategy expectedStrategy; private final String namespace; - private final T disabledValue; private final Consumer valueChecker; - StrategyValidator(String namespace, String key, ConnectionStrategy expectedStrategy, T disabledValue) { - this(namespace, key, expectedStrategy, disabledValue, (v) -> {}); + StrategyValidator(String namespace, String key, ConnectionStrategy expectedStrategy) { + this(namespace, key, expectedStrategy, (v) -> {}); } - StrategyValidator(String namespace, String key, ConnectionStrategy expectedStrategy, T disabledValue, Consumer valueChecker) { + StrategyValidator(String namespace, String key, ConnectionStrategy expectedStrategy, Consumer valueChecker) { this.namespace = namespace; this.key = key; this.expectedStrategy = expectedStrategy; - this.disabledValue = disabledValue; this.valueChecker = valueChecker; } @@ -354,10 +352,10 @@ public void validate(T value) { } @Override - public void validate(T value, Map, Object> settings) { + public void validate(T value, Map, Object> settings, boolean isPresent) { Setting concrete = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(namespace); ConnectionStrategy modeType = (ConnectionStrategy) settings.get(concrete); - if (value != null && value.equals(disabledValue) == false && modeType.equals(expectedStrategy) == false) { + if (isPresent && modeType.equals(expectedStrategy) == false) { throw new IllegalArgumentException("Setting \"" + key + "\" cannot be used with the configured \"" + concrete.getKey() + "\" [required=" + expectedStrategy.name() + ", configured=" + modeType.name() + "]"); } diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java index 6a1129a41169c..839a1d19285b7 100644 --- a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -55,7 +55,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { // validate address parsePort(s); return s; - }, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE, Collections.emptyList()), + }, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE), Setting.Property.Dynamic, Setting.Property.NodeScope)); /** @@ -64,7 +64,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { public static final Setting.AffixSetting REMOTE_SOCKET_CONNECTIONS = Setting.affixKeySetting( "cluster.remote.", "simple.socket_connections", - (ns, key) -> intSetting(key, 18, 1, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE, 18), + (ns, key) -> intSetting(key, 18, 1, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE), Setting.Property.Dynamic, Setting.Property.NodeScope)); static final int CHANNELS_PER_CONNECTION = 1; diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index d23d47a0d6f0f..ee56629ebf0aa 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -75,7 +75,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { parsePort(s); return s; }, - new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF, Collections.emptyList()), + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF), Setting.Property.Dynamic, Setting.Property.NodeScope)); @@ -93,7 +93,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { return s; }, s -> REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(ns).get(s), - new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF, Collections.emptyList()), + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF), Setting.Property.Dynamic, Setting.Property.NodeScope)); @@ -109,8 +109,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { "proxy", (ns, key) -> Setting.simpleString( key, - "", - new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF, "", s -> { + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF, s -> { if (Strings.hasLength(s)) { parsePort(s); } @@ -119,8 +118,6 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { Setting.Property.NodeScope), REMOTE_CLUSTER_SEEDS); - private static final int DEFAULT_CONNECTIONS_PER_CLUSTER = 3; - /** * The maximum number of connections that will be established to a remote cluster. For instance if there is only a single * seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3. @@ -128,7 +125,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { public static final Setting REMOTE_CONNECTIONS_PER_CLUSTER = intSetting( "cluster.remote.connections_per_cluster", - DEFAULT_CONNECTIONS_PER_CLUSTER, + 3, 1, Setting.Property.NodeScope); /** @@ -142,7 +139,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { key, REMOTE_CONNECTIONS_PER_CLUSTER, 1, - new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF, DEFAULT_CONNECTIONS_PER_CLUSTER), + new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF), Setting.Property.Dynamic, Setting.Property.NodeScope)); From edefed3a9ee2f7d56cfc4a492002893d0aa840d4 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 21 Nov 2019 10:40:07 -0700 Subject: [PATCH 09/10] REST tests --- .../100_connection_mode_configuration.yml | 212 ++++++++++++++++++ .../100_simple_connection_mode.yml | 33 --- .../70_connection_mode_configuration.yml | 212 ++++++++++++++++++ .../70_simple_connection_mode.yml | 33 --- 4 files changed, 424 insertions(+), 66 deletions(-) create mode 100644 qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_connection_mode_configuration.yml delete mode 100644 qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_simple_connection_mode.yml create mode 100644 x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_connection_mode_configuration.yml delete mode 100644 x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_simple_connection_mode.yml diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_connection_mode_configuration.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_connection_mode_configuration.yml new file mode 100644 index 0000000000000..ed639b3655ed5 --- /dev/null +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_connection_mode_configuration.yml @@ -0,0 +1,212 @@ +--- +"Add transient remote cluster in simple mode with invalid sniff settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.node_connections: "5" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.node_connections\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" } + +--- +"Add transient remote cluster in sniff mode with invalid simple settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.simple.socket_connections: "20" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.socket_connections\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.addresses\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" } + +--- +"Add transient remote cluster using simple connection mode using valid settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.simple.socket_connections: "3" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.socket_connections: "3"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } + +--- +"Add transient remote cluster using sniff connection mode using valid settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "sniff" + cluster.remote.test_remote_cluster.sniff.node_connections: "3" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.node_connections: "3"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } + +--- +"Switch connection mode for configured cluster": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "sniff" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.seeds: null + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_simple_connection_mode.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_simple_connection_mode.yml deleted file mode 100644 index 50334f399fc43..0000000000000 --- a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_simple_connection_mode.yml +++ /dev/null @@ -1,33 +0,0 @@ ---- -"Add transient remote cluster using simple connection mode": - - do: - cluster.get_settings: - include_defaults: true - - - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } - - - do: - cluster.put_settings: - flat_settings: true - body: - transient: - cluster.remote.test_remote_cluster.mode: "simple" - cluster.remote.test_remote_cluster.simple.socket_connections: "3" - cluster.remote.test_remote_cluster.simple.addresses: $remote_ip - - - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"} - - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.socket_connections: "3"} - - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip} - - - do: - search: - rest_total_hits_as_int: true - index: test_remote_cluster:test_index - - - is_false: num_reduce_phases - - match: {_clusters.total: 1} - - match: {_clusters.successful: 1} - - match: {_clusters.skipped: 0} - - match: { _shards.total: 3 } - - match: { hits.total: 6 } - - match: { hits.hits.0._index: "test_remote_cluster:test_index" } diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_connection_mode_configuration.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_connection_mode_configuration.yml new file mode 100644 index 0000000000000..ed639b3655ed5 --- /dev/null +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_connection_mode_configuration.yml @@ -0,0 +1,212 @@ +--- +"Add transient remote cluster in simple mode with invalid sniff settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.node_connections: "5" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.node_connections\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" } + +--- +"Add transient remote cluster in sniff mode with invalid simple settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.simple.socket_connections: "20" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.socket_connections\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.addresses\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" } + +--- +"Add transient remote cluster using simple connection mode using valid settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.simple.socket_connections: "3" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.socket_connections: "3"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } + +--- +"Add transient remote cluster using sniff connection mode using valid settings": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "sniff" + cluster.remote.test_remote_cluster.sniff.node_connections: "3" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.node_connections: "3"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } + +--- +"Switch connection mode for configured cluster": + - do: + cluster.get_settings: + include_defaults: true + + - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "sniff" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } + + - do: + catch: bad_request + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: { status: 400 } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be + used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.seeds: null + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip} + + - do: + search: + rest_total_hits_as_int: true + index: test_remote_cluster:test_index + + - is_false: num_reduce_phases + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 3 } + - match: { hits.total: 6 } + - match: { hits.hits.0._index: "test_remote_cluster:test_index" } diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_simple_connection_mode.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_simple_connection_mode.yml deleted file mode 100644 index 50334f399fc43..0000000000000 --- a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_simple_connection_mode.yml +++ /dev/null @@ -1,33 +0,0 @@ ---- -"Add transient remote cluster using simple connection mode": - - do: - cluster.get_settings: - include_defaults: true - - - set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip } - - - do: - cluster.put_settings: - flat_settings: true - body: - transient: - cluster.remote.test_remote_cluster.mode: "simple" - cluster.remote.test_remote_cluster.simple.socket_connections: "3" - cluster.remote.test_remote_cluster.simple.addresses: $remote_ip - - - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"} - - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.socket_connections: "3"} - - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip} - - - do: - search: - rest_total_hits_as_int: true - index: test_remote_cluster:test_index - - - is_false: num_reduce_phases - - match: {_clusters.total: 1} - - match: {_clusters.successful: 1} - - match: {_clusters.skipped: 0} - - match: { _shards.total: 3 } - - match: { hits.total: 6 } - - match: { hits.hits.0._index: "test_remote_cluster:test_index" } From 1a55a1e0bf18b6ccc9e32a60990d1b2f4a0958eb Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 22 Nov 2019 09:54:23 -0700 Subject: [PATCH 10/10] Review changes --- .../transport/RemoteClusterConnection.java | 2 +- .../transport/RemoteClusterService.java | 35 +++++++++---------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 2ab24ee089903..8b89e8f8f8905 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -238,7 +238,7 @@ ConnectionManager getConnectionManager() { return remoteConnectionManager.getConnectionManager(); } - public boolean shouldRebuildConnection(Settings newSettings) { + boolean shouldRebuildConnection(Settings newSettings) { return connectionStrategy.shouldRebuildConnection(newSettings); } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index ba1084de80571..2bfe3980ed8d3 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -226,22 +226,18 @@ private synchronized void updateSkipUnavailable(String clusterAlias, Boolean ski @Override protected void updateRemoteCluster(String clusterAlias, Settings settings) { - if (remoteClusters.containsKey(clusterAlias) == false) { - CountDownLatch latch = new CountDownLatch(1); - updateRemoteCluster(clusterAlias, settings, ActionListener.wrap(latch::countDown)); + CountDownLatch latch = new CountDownLatch(1); + updateRemoteCluster(clusterAlias, settings, ActionListener.wrap(latch::countDown)); - try { - // Wait 10 seconds for a new cluster. We must use a latch instead of a future because we - // are on the cluster state thread and our custom future implementation will throw an - // assertion. - if (latch.await(10, TimeUnit.SECONDS) == false) { - logger.warn("failed to connect to new remote cluster {} within {}", clusterAlias, TimeValue.timeValueSeconds(10)); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + try { + // Wait 10 seconds for a connections. We must use a latch instead of a future because we + // are on the cluster state thread and our custom future implementation will throw an + // assertion. + if (latch.await(10, TimeUnit.SECONDS) == false) { + logger.warn("failed to connect to new remote cluster {} within {}", clusterAlias, TimeValue.timeValueSeconds(10)); } - } else { - updateRemoteCluster(clusterAlias, settings, noopListener); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } @@ -269,13 +265,14 @@ synchronized void updateRemoteCluster(String clusterAlias, Settings newSettings, return; } - // this is a new cluster we have to add a new representation if (remote == null) { + // this is a new cluster we have to add a new representation Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build(); remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService); remoteClusters.put(clusterAlias, remote); + remote.ensureConnected(listener); } else if (remote.shouldRebuildConnection(newSettings)) { - // New ConnectionProfile. Must tear down existing connection + // Changes to connection configuration. Must tear down existing connection try { IOUtils.close(remote); } catch (IOException e) { @@ -285,9 +282,11 @@ synchronized void updateRemoteCluster(String clusterAlias, Settings newSettings, Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build(); remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService); remoteClusters.put(clusterAlias, remote); + remote.ensureConnected(listener); + } else { + // No changes to connection configuration. + listener.onResponse(null); } - - remote.ensureConnected(listener); } /**