From 543de11fa62a14fc062f199ae41465cf88c9fa2c Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Wed, 18 Dec 2019 15:31:04 +0000 Subject: [PATCH 1/3] fix: pull queries should work across nodes Fixes: https://github.com/confluentinc/ksql/issues/4142 Fixes: https://github.com/confluentinc/ksql/issues/4151 Fixes: https://github.com/confluentinc/ksql/issues/4152 Introduces a new `inter.node.listener` that can be used to specify a URL that the node can be contacted on by other nodes. This can be different to the listeners defined in `listeners`. This can be required if `listeners` is set to a wildcard address, i.e. IPv4 `0.0.0.0` or IPv6 `[::]`, or if the node sits behind network infrastructure that requires other nodes to reach it using a different URL. If `inter.node.listener` is not set it still defaults to the first listener in `listener` config. However, it now replaces an wildcard address with `localHost`. This means inter-node comms is still possible for nodes running on the same host. Warnings are logged if the inter-node listener resolves to a loopback or local address. --- config/ksql-production-server.properties | 6 + config/ksql-server.properties | 6 + .../ksql/configdef/ConfigValidators.java | 14 + .../ksql/configdef/ConfigValidatorsTest.java | 25 + .../ksql/rest/server/KsqlRestApplication.java | 85 ++- .../ksql/rest/server/KsqlRestConfig.java | 209 +++++++- .../KsqlRestApplicationFunctionalTest.java | 107 ++++ .../rest/server/KsqlRestApplicationTest.java | 113 ++-- .../ksql/rest/server/KsqlRestConfigTest.java | 500 +++++++++++++++++- 9 files changed, 969 insertions(+), 96 deletions(-) create mode 100644 ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationFunctionalTest.java diff --git a/config/ksql-production-server.properties b/config/ksql-production-server.properties index 1c3943aa67d1..1b47c977d5ab 100644 --- a/config/ksql-production-server.properties +++ b/config/ksql-production-server.properties @@ -18,17 +18,23 @@ ### HTTP ### # The URL the KSQL server will listen on: # The default is any IPv4 interface on the machine. +# NOTE: If set to wildcard or loopback set 'inter.node.listener' to enable pull queries across machines listeners=http://0.0.0.0:8088 # Use the 'listeners' line below for any IPv6 interface on the machine. # listeners=http://[::]:8088 +# If running a multi-node cluster across multiple machines and 'listeners' is set to a wildcard or loopback address +# 'inter.node.listener' must be set to the URL other KSQL nodes should use to reach this node. +# inter.node.listener=? + ### HTTPS ### # To switch KSQL over to communicating using HTTPS comment out the 'listeners' line above # uncomment and complete the properties below. # See: https://docs.confluent.io/current/ksql/docs/installation/server-config/security.html#configuring-ksql-cli-for-https # # listeners=https://0.0.0.0:8088 +# inter.node.listener=? # ssl.keystore.location=? # ssl.keystore.password=? # ssl.key.password=? diff --git a/config/ksql-server.properties b/config/ksql-server.properties index f11b09d8e24e..30e80276644d 100644 --- a/config/ksql-server.properties +++ b/config/ksql-server.properties @@ -18,17 +18,23 @@ ### HTTP ### # The URL the KSQL server will listen on: # The default is any IPv4 interface on the machine. +# NOTE: If set to wildcard or loopback set 'inter.node.listener' to enable pull queries across machines listeners=http://0.0.0.0:8088 # Use the 'listeners' line below for any IPv6 interface on the machine. # listeners=http://[::]:8088 +# If running a multi-node cluster across multiple machines and 'listeners' is set to a wildcard or loopback address +# 'inter.node.listener' must be set to the URL other KSQL nodes should use to reach this node. +# inter.node.listener=? + ### HTTPS ### # To switch KSQL over to communicating using HTTPS comment out the 'listeners' line above # uncomment and complete the properties below. # See: https://docs.confluent.io/current/ksql/docs/installation/server-config/security.html#configuring-ksql-cli-for-https # # listeners=https://0.0.0.0:8088 +# inter.node.listener=? # ssl.keystore.location=? # ssl.keystore.password=? # ssl.key.password=? diff --git a/ksql-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java b/ksql-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java index db8075ef6f1a..3db0708f0bbd 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java +++ b/ksql-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java @@ -17,6 +17,7 @@ import static java.util.Objects.requireNonNull; +import java.net.URL; import java.util.Arrays; import java.util.EnumSet; import java.util.List; @@ -78,6 +79,19 @@ public static > Validator enumValues(final Class enumClass) return ValidCaseInsensitiveString.in(validValues); } + public static Validator validUrl() { + return (name, val) -> { + if (!(val instanceof String)) { + throw new IllegalArgumentException("validator should only be used with STRING defs"); + } + try { + new URL((String)val); + } catch (Exception e) { + throw new ConfigException(name, val, "Not valid URL: " + e.getMessage()); + } + }; + } + public static final class ValidCaseInsensitiveString implements Validator { private final List validStrings; diff --git a/ksql-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java b/ksql-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java index 52d2c5180796..b635e9a72f86 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java @@ -182,6 +182,31 @@ public void shouldThrowIfParserThrows() { validator.ensureValid("propName", "value"); } + @Test + public void shouldThrowOnInvalidURL() { + // Given: + final Validator validator = ConfigValidators.validUrl(); + + // Then: + expectedException.expect(ConfigException.class); + expectedException.expectMessage( + "Invalid value INVALID for configuration propName: Not valid URL: no protocol: INVALID"); + + // When: + validator.ensureValid("propName", "INVALID"); + } + + @Test + public void shouldNotThrowOnValidURL() { + // Given: + final Validator validator = ConfigValidators.validUrl(); + + // When: + validator.ensureValid("propName", "http://valid:25896/somePath"); + + // Then: did not throw. + } + private enum TestEnum { FOO, BAR } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 40b894b08d29..da1122fbd0a5 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -16,6 +16,7 @@ package io.confluent.ksql.rest.server; import static io.confluent.ksql.rest.server.KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG; +import static io.confluent.rest.RestConfig.LISTENERS_CONFIG; import static java.util.Objects.requireNonNull; import com.fasterxml.jackson.jaxrs.base.JsonParseExceptionMapper; @@ -75,6 +76,7 @@ import io.confluent.ksql.services.SimpleKsqlClient; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlServerException; import io.confluent.ksql.util.RetryUtil; import io.confluent.ksql.util.Version; import io.confluent.ksql.util.WelcomeMsgUtils; @@ -85,6 +87,7 @@ import java.io.Console; import java.io.OutputStreamWriter; import java.io.PrintWriter; +import java.net.MalformedURLException; import java.net.URI; import java.net.URL; import java.nio.charset.StandardCharsets; @@ -104,6 +107,7 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.websocket.DeploymentException; import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpointConfig; @@ -211,7 +215,7 @@ public void setupResources(final Configurable config, final KsqlRestConfig ap } @Override - public void startAsync() throws Exception { + public void startAsync() { log.info("KSQL RESTful API listening on {}", StringUtils.join(getListeners(), ", ")); final KsqlConfig ksqlConfigWithPort = buildConfigWithPort(); configurables.forEach(c -> c.configure(ksqlConfigWithPort)); @@ -321,23 +325,46 @@ public void onShutdown() { } List getListeners() { - return Arrays.stream(server.getConnectors()) - .filter(connector -> connector instanceof ServerConnector) - .map(ServerConnector.class::cast) - .map(connector -> { - try { - final String protocol = new HashSet<>(connector.getProtocols()) - .stream() - .map(String::toLowerCase) - .anyMatch(s -> s.equals("ssl")) ? "https" : "http"; - - final int localPort = connector.getLocalPort(); - - return new URL(protocol, "localhost", localPort, ""); - } catch (final Exception e) { - throw new RuntimeException("Malformed listener", e); - } - }) + final Function> resolvePort = url -> + Arrays.stream(server.getConnectors()) + .filter(connector -> connector instanceof ServerConnector) + .map(ServerConnector.class::cast) + .filter(connector -> { + final String connectorProtocol = connector.getProtocols().stream() + .map(String::toLowerCase) + .anyMatch(p -> p.equals("ssl")) ? "https" : "http"; + + return connectorProtocol.equalsIgnoreCase(url.getProtocol()); + }) + .map(ServerConnector::getLocalPort) + .collect(Collectors.toSet()); + + final Function> resolveUrl = listener -> { + try { + final URL url = new URL(listener); + if (url.getPort() != 0) { + return Stream.of(url); + } + + // Need to resolve port using actual listeners: + return resolvePort.apply(url).stream() + .map(port -> { + try { + return new URL(url.getProtocol(), url.getHost(), port, url.getFile()); + } catch (MalformedURLException e) { + throw new KsqlServerException("Malformed URL specified in '" + + LISTENERS_CONFIG + "' config: " + listener, e); + } + }); + } catch (MalformedURLException e) { + throw new KsqlServerException("Malformed URL specified in '" + + LISTENERS_CONFIG + "' config: " + listener, e); + } + }; + + return restConfig.getList(LISTENERS_CONFIG).stream() + .flatMap(resolveUrl) + .distinct() .collect(Collectors.toList()); } @@ -667,21 +694,31 @@ private static void maybeCreateProcessingLogStream( * * @return true server config. */ - private KsqlConfig buildConfigWithPort() { + @VisibleForTesting + KsqlConfig buildConfigWithPort() { final Map props = ksqlConfigNoPort.originals(); - // Wire up KS IQ endpoint discovery to the FIRST listener: - final URL firstListener = getListeners().get(0); + // Wire up KS IQ so that pull queries work across KSQL nodes: props.put( KsqlConfig.KSQL_STREAMS_PREFIX + StreamsConfig.APPLICATION_SERVER_CONFIG, - firstListener.toString() + restConfig.getInterNodeListener(this::resolvePort).toString() ); - log.info("Using first listener URL for intra-node communication: {}", firstListener); - return new KsqlConfig(props); } + private int resolvePort(final URL listener) { + return getListeners().stream() + .filter(l -> + l.getProtocol().equals(listener.getProtocol()) + && l.getHost().equals(listener.getHost()) + ) + .map(URL::getPort) + .findFirst() + .orElseThrow(() -> + new IllegalStateException("Failed resolve port for listener: " + listener)); + } + private static KsqlRestConfig injectPathsWithoutAuthentication(final KsqlRestConfig restConfig) { final Set authenticationSkipPaths = new HashSet<>( restConfig.getList(RestConfig.AUTHENTICATION_SKIP_PATHS) diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index a2e0e97ea7e2..822e30e026a3 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -15,15 +15,30 @@ package io.confluent.ksql.rest.server; +import com.google.common.annotations.VisibleForTesting; +import io.confluent.ksql.configdef.ConfigValidators; import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.KsqlServerException; import io.confluent.rest.RestConfig; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.UnknownHostException; +import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.function.Function; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KsqlRestConfig extends RestConfig { + private static final Logger LOGGER = LoggerFactory.getLogger(KsqlRestConfig.class); + private static final String KSQL_CONFIG_PREFIX = "ksql."; private static final String COMMAND_CONSUMER_PREFIX = @@ -31,6 +46,17 @@ public class KsqlRestConfig extends RestConfig { private static final String COMMAND_PRODUCER_PREFIX = KSQL_CONFIG_PREFIX + "server.command.producer."; + public static final String INTER_NODE_LISTENER_CONFIG = + KSQL_CONFIG_PREFIX + "inter.node.listener"; + private static final String INTER_NODE_LISTENER_DOC = + "The listener used for communication between KSQL nodes in the cluster, if different to the '" + + LISTENERS_CONFIG + "' config property. " + + "In IaaS environments, this may need to be different from the interface to which " + + "the server binds. If this is not set, the first value from listeners will be used. " + + "Unlike listeners, it is not valid to use the 0.0.0.0 (IPv4) or [::] (IPv6) " + + "wildcard addresses."; + private static final String USE_FIRST_LISTENER = "http://use.first.listener"; + static final String STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG = "query.stream.disconnect.check"; @@ -88,6 +114,13 @@ public class KsqlRestConfig extends RestConfig { static { CONFIG_DEF = baseConfigDef().define( + INTER_NODE_LISTENER_CONFIG, + Type.STRING, + USE_FIRST_LISTENER, + ConfigValidators.validUrl(), + Importance.HIGH, + INTER_NODE_LISTENER_DOC + ).define( STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG, Type.LONG, 1000L, @@ -141,10 +174,14 @@ public class KsqlRestConfig extends RestConfig { public KsqlRestConfig(final Map props) { super(CONFIG_DEF, props); - if (getList(RestConfig.LISTENERS_CONFIG).isEmpty()) { - throw new KsqlException(RestConfig.LISTENERS_CONFIG + " must be supplied. " - + RestConfig.LISTENERS_DOC); + + final List listeners = getList(LISTENERS_CONFIG); + if (listeners.isEmpty()) { + throw new KsqlException(LISTENERS_CONFIG + " must be supplied. " + LISTENERS_DOC); } + + listeners + .forEach(listener -> ConfigValidators.validUrl().ensureValid(LISTENERS_CONFIG, listener)); } // Bit of a hack to get around the fact that RestConfig.originals() is private for some reason @@ -169,4 +206,170 @@ Map getCommandProducerProperties() { public Map getKsqlConfigProperties() { return getOriginals(); } + + /** + * Determines which URL should be used to contact this node from other KSQL nodes. + * + *

Uses {@code INTER_NODE_LISTENER_CONFIG} by default, or first listener defined in + * {@code LISTENERS_CONFIG} if {@code INTER_NODE_LISTENER_CONFIG} not set. + * + *

Method takes a {@code portResolver} to resolve any auto-assigned port in + * {@code LISTENERS_CONFIG}, (i.e. port {@code 0}. + * + *

Any loopback or localhost in {@code LISTENERS_CONFIG} will be replaced with local machine + * name, though this is not guarenteed to work across machines in all circumstance. + * + * @param portResolver called to resolve port in first {@code LISTENERS_CONFIG} if {@code 0}. + * @return the resolved inter-node endpoint to use. + */ + public URL getInterNodeListener( + final Function portResolver + ) { + return getInterNodeListener(portResolver, LOGGER); + } + + @VisibleForTesting + URL getInterNodeListener( + final Function portResolver, + final Logger logger + ) { + return getString(INTER_NODE_LISTENER_CONFIG).equals(USE_FIRST_LISTENER) + ? getInterNodeListenerFromFirstListener(portResolver, logger) + : getInterNodeListenerFromExplicitConfig(logger); + } + + private URL getInterNodeListenerFromFirstListener( + final Function portResolver, + final Logger logger + ) { + final List configValue = getList(LISTENERS_CONFIG); + + final URL firstListener = parseUrl(configValue.get(0), LISTENERS_CONFIG); + + final InetAddress address = parseInetAddress(firstListener.getHost()) + .orElseThrow(() -> new ConfigException( + LISTENERS_CONFIG, + configValue, + "Could not resolve first host" + )); + + final URL listener = sanitizeInterNodeListener( + firstListener, + portResolver, + address.isAnyLocalAddress() + ); + + logInterNodeListener( + logger, + listener, + Optional.of(address), + "first '" + LISTENERS_CONFIG + "'" + ); + + return listener; + } + + private URL getInterNodeListenerFromExplicitConfig(final Logger logger) { + final String configValue = getString(INTER_NODE_LISTENER_CONFIG); + + final URL listener = parseUrl(configValue, INTER_NODE_LISTENER_CONFIG); + + if (listener.getPort() <= 0) { + throw new ConfigException(INTER_NODE_LISTENER_CONFIG, configValue, "Must have valid port"); + } + + // Valid for address to not be resolvable, as may be _externally_ resolvable: + final Optional address = parseInetAddress(listener.getHost()); + + address.ifPresent(a -> { + if (a.isAnyLocalAddress()) { + throw new ConfigException(INTER_NODE_LISTENER_CONFIG, configValue, "Can not be wildcard"); + } + }); + + logInterNodeListener( + logger, + listener, + address, + "'" + INTER_NODE_LISTENER_CONFIG + "'" + ); + + return listener; + } + + private static void logInterNodeListener( + final Logger logger, + final URL listener, + final Optional address, + final String sourceConfigName + ) { + address.ifPresent(a -> { + if (a.isLoopbackAddress()) { + logger.warn( + "{} config is set to a loopback address: {}. Intra-node communication will only work " + + "between nodes running on the same machine.", + sourceConfigName, listener + ); + } + + if (a.isAnyLocalAddress()) { + logger.warn( + "{} config uses wildcard address: {}. Intra-node communication will only work " + + "between nodes running on the same machine.", + sourceConfigName, listener + ); + } + }); + + logger.info("Using {} config for intra-node communication: {}", sourceConfigName, listener); + } + + private static URL sanitizeInterNodeListener( + final URL listener, + final Function portResolver, + final boolean replaceHost + ) { + if (!replaceHost && listener.getPort() > 0) { + return listener; + } + + final String host = replaceHost + ? getLocalHostName() + : listener.getHost(); + + final int port = listener.getPort() == 0 + ? portResolver.apply(listener) + : listener.getPort(); + + try { + return new URL(listener.getProtocol(), host, port, listener.getFile()); + } catch (final MalformedURLException e) { + throw new KsqlServerException("Resolved first listener to malformed URL", e); + } + } + + private static URL parseUrl(final String address, final String configName) { + try { + return new URL(address); + } catch (final MalformedURLException e) { + throw new ConfigException(configName, address, e.getMessage()); + } + } + + + private static Optional parseInetAddress(final String address) { + try { + return Optional.of(InetAddress.getByName(address)); + } catch (final UnknownHostException e) { + return Optional.empty(); + } + } + + private static String getLocalHostName() { + try { + return InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + throw new KsqlServerException("Failed to obtain local host info", e); + } + } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationFunctionalTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationFunctionalTest.java new file mode 100644 index 000000000000..4703dbb0d10d --- /dev/null +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationFunctionalTest.java @@ -0,0 +1,107 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; + +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster; +import io.confluent.ksql.version.metrics.KsqlVersionCheckerAgent; +import io.confluent.rest.RestConfig; +import java.net.URL; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.CommonClientConfigs; +import org.junit.After; +import org.junit.ClassRule; +import org.junit.Test; + +public class KsqlRestApplicationFunctionalTest { + + @ClassRule + public static final EmbeddedSingleNodeKafkaCluster CLUSTER = + EmbeddedSingleNodeKafkaCluster.build(); + + private KsqlRestApplication app; + + @After + public void tearDown() throws Exception { + if (app != null) { + app.stop(); + } + } + + @Test + public void shouldResolveListenersWithExplicitAndAutoAssignedPorts() { + // Given: + givenAppStartedWith(ImmutableMap + .of(RestConfig.LISTENERS_CONFIG, "http://127.0.0.1:0,https://localHost:8088")); + + // When: + final List listeners = app.getListeners(); + + // Then: + assertThat(listeners, hasSize(2)); + assertThat(listeners.get(0).getProtocol(), is("http")); + assertThat(listeners.get(0).getHost(), is("127.0.0.1")); + assertThat(listeners.get(0).getPort(), is(not(0))); + assertThat(listeners.get(1).getProtocol(), is("https")); + assertThat(listeners.get(1).getHost(), is("localhost")); + assertThat(listeners.get(1).getPort(), is(8088)); + } + + @Test + public void shouldResolveMultipleListenersPerProtocol() { + // Given: + givenAppStartedWith(ImmutableMap + .of(RestConfig.LISTENERS_CONFIG, "http://localhost:0,http://localhost:0")); + + // When: + final List listeners = app.getListeners(); + + // Then: + assertThat(listeners, hasSize(2)); + assertThat(listeners.get(0).getProtocol(), is("http")); + assertThat(listeners.get(0).getHost(), is("localhost")); + assertThat(listeners.get(0).getPort(), is(not(0))); + assertThat(listeners.get(1).getProtocol(), is("http")); + assertThat(listeners.get(1).getHost(), is("localhost")); + assertThat(listeners.get(1).getPort(), is(not(0))); + } + + private void givenAppStartedWith(final Map config) { + + final KsqlRestConfig restConfig = new KsqlRestConfig(ImmutableMap.builder() + .putAll(config) + .put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()) + .build() + ); + + app = KsqlRestApplication.buildApplication( + restConfig, + KsqlVersionCheckerAgent::new + ); + + try { + app.start(); + } catch (Exception e) { + throw new AssertionError("Failed to start", e); + } + } +} \ No newline at end of file diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 712fa7fe61bd..9da5dc5b89e1 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -15,6 +15,8 @@ package io.confluent.ksql.rest.server; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyMap; @@ -28,20 +30,16 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; -import io.confluent.ksql.KsqlExecutionContext; +import com.google.common.collect.ImmutableMap; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.logging.processing.ProcessingLogConfig; import io.confluent.ksql.logging.processing.ProcessingLogContext; -import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; -import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.KsqlRequest; -import io.confluent.ksql.rest.server.computation.Command; import io.confluent.ksql.rest.server.computation.CommandRunner; import io.confluent.ksql.rest.server.computation.CommandStore; -import io.confluent.ksql.rest.server.computation.QueuedCommandStatus; import io.confluent.ksql.rest.server.context.KsqlRestServiceContextBinder; import io.confluent.ksql.rest.server.filters.KsqlAuthorizationFilter; import io.confluent.ksql.rest.server.resources.KsqlResource; @@ -59,13 +57,16 @@ import io.confluent.rest.RestConfig; import java.util.Collections; import java.util.LinkedList; +import java.util.Map; import java.util.Optional; import java.util.Queue; import java.util.function.Consumer; import javax.ws.rs.core.Configurable; -import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.streams.StreamsConfig; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.InOrder; import org.mockito.Mock; @@ -78,24 +79,16 @@ public class KsqlRestApplicationTest { private static final String LOG_TOPIC_NAME = "log_topic"; private static final String CMD_TOPIC_NAME = "command_topic"; - private final KsqlRestConfig restConfig = - new KsqlRestConfig( - Collections.singletonMap(RestConfig.LISTENERS_CONFIG, - "http://localhost:8088")); + @Rule + public final ExpectedException expectedException = ExpectedException.none(); @Mock private ServiceContext serviceContext; @Mock private KsqlEngine ksqlEngine; @Mock - private MetaStore metaStore; - @Mock - private KsqlExecutionContext sandBox; - @Mock private KsqlConfig ksqlConfig; @Mock - private KsqlRestConfig ksqlRestConfig; - @Mock private ProcessingLogConfig processingLogConfig; @Mock private CommandRunner commandRunner; @@ -112,8 +105,6 @@ public class KsqlRestApplicationTest { @Mock private CommandStore commandQueue; @Mock - private QueuedCommandStatus queuedCommandStatus; - @Mock private KsqlSecurityExtension securityExtension; @Mock private ProcessingLogContext processingLogContext; @@ -131,10 +122,10 @@ public class KsqlRestApplicationTest { private PreparedStatement preparedStatement; @Mock private Consumer rocksDBConfigSetterHandler; - @Mock - private Producer transactionalProducer; + private String logCreateStatement; private KsqlRestApplication app; + private KsqlRestConfig restConfig; @SuppressWarnings("unchecked") @Before @@ -163,26 +154,7 @@ public void setUp() { ksqlConfig ); - app = new KsqlRestApplication( - serviceContext, - ksqlEngine, - ksqlConfig, - restConfig, - commandRunner, - commandQueue, - rootDocument, - statusResource, - streamedQueryResource, - ksqlResource, - versionCheckerAgent, - KsqlRestServiceContextBinder::new, - securityExtension, - serverState, - processingLogContext, - ImmutableList.of(precondition1, precondition2), - ImmutableList.of(ksqlResource, streamedQueryResource), - rocksDBConfigSetterHandler - ); + givenAppWithRestConfig(ImmutableMap.of(RestConfig.LISTENERS_CONFIG, "http://localhost:0")); } @Test @@ -378,4 +350,65 @@ public void shouldConfigureRocksDBConfigSetter() { // Then: verify(rocksDBConfigSetterHandler).accept(ksqlConfig); } + + @Test + public void shouldConfigureIQWithInterNodeListenerIfSet() { + // Given: + givenAppWithRestConfig(ImmutableMap.of( + RestConfig.LISTENERS_CONFIG, "http://localhost:0", + KsqlRestConfig.INTER_NODE_LISTENER_CONFIG, "https://some.host:12345" + )); + + // When: + final KsqlConfig ksqlConfig = app.buildConfigWithPort(); + + // Then: + assertThat( + ksqlConfig.getKsqlStreamConfigProps().get(StreamsConfig.APPLICATION_SERVER_CONFIG), + is("https://some.host:12345") + ); + } + + @Test + public void shouldConfigureIQWithFirstListenerIfInterNodeNotSet() { + // Given: + givenAppWithRestConfig(ImmutableMap.of( + RestConfig.LISTENERS_CONFIG, "http://some.host:1244,https://some.other.host:1258" + )); + + // When: + final KsqlConfig ksqlConfig = app.buildConfigWithPort(); + + // Then: + assertThat( + ksqlConfig.getKsqlStreamConfigProps().get(StreamsConfig.APPLICATION_SERVER_CONFIG), + is("http://some.host:1244") + ); + } + + private void givenAppWithRestConfig(final Map restConfigMap) { + + restConfig = new KsqlRestConfig(restConfigMap); + + app = new KsqlRestApplication( + serviceContext, + ksqlEngine, + ksqlConfig, + restConfig, + commandRunner, + commandQueue, + rootDocument, + statusResource, + streamedQueryResource, + ksqlResource, + versionCheckerAgent, + KsqlRestServiceContextBinder::new, + securityExtension, + serverState, + processingLogContext, + ImmutableList.of(precondition1, precondition2), + ImmutableList.of(ksqlResource, streamedQueryResource), + rocksDBConfigSetterHandler + ); + } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java index ce0249253740..2884c550190e 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java @@ -16,60 +16,502 @@ package io.confluent.ksql.rest.server; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; +import static io.confluent.ksql.rest.server.KsqlRestConfig.INTER_NODE_LISTENER_CONFIG; +import static io.confluent.rest.RestConfig.LISTENERS_CONFIG; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.util.KsqlConfig; -import io.confluent.rest.RestConfig; -import java.util.HashMap; +import java.net.MalformedURLException; +import java.net.URL; import java.util.Map; +import java.util.function.Function; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.streams.StreamsConfig; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.Logger; +@RunWith(MockitoJUnitRunner.class) public class KsqlRestConfigTest { - private static final Map MIN_VALID_CONFIGS = ImmutableMap.builder() + private static final Map MIN_VALID_CONFIGS = ImmutableMap.builder() .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") - .put(RestConfig.LISTENERS_CONFIG, "http://localhost:8088") + .put(LISTENERS_CONFIG, "http://localhost:8088") .build(); - @Test - public void testGetKsqlConfigProperties() { - final Map inputProperties = new HashMap<>(MIN_VALID_CONFIGS); - inputProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - inputProperties.put(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "test"); + private static final String QUOTED_INTER_NODE_LISTENER_CONFIG = + "'" + INTER_NODE_LISTENER_CONFIG + "'"; + + private static final String QUOTED_FIRST_LISTENER_CONFIG = + "first '" + LISTENERS_CONFIG + "'"; + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Mock + private Function portResolver; + @Mock + private Logger logger; - final KsqlRestConfig config = new KsqlRestConfig(inputProperties); + @Test + public void shouldGetKsqlConfigProperties() { + // Given: + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .put(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "test") + .build() + ); + // When: final Map ksqlConfigProperties = config.getKsqlConfigProperties(); - final Map expectedKsqlConfigProperties = new HashMap<>(); - expectedKsqlConfigProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - expectedKsqlConfigProperties.put(RestConfig.LISTENERS_CONFIG, "http://localhost:8088"); - expectedKsqlConfigProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - expectedKsqlConfigProperties.put(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "test"); - assertThat(ksqlConfigProperties, equalTo(expectedKsqlConfigProperties)); + + // Then: + assertThat(ksqlConfigProperties, is(ImmutableMap.of( + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092", + LISTENERS_CONFIG, "http://localhost:8088", + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest", + KsqlConfig.KSQL_SERVICE_ID_CONFIG, "test")) + ); } // Just a sanity check to make sure that, although they contain identical mappings, successive maps returned by calls // to KsqlRestConfig.getOriginals() do not actually return the same object (mutability would then be an issue) @Test - public void testOriginalsReplicability() { - final String COMMIT_INTERVAL_MS = "10"; - - final Map inputProperties = new HashMap<>(MIN_VALID_CONFIGS); - inputProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); - final KsqlRestConfig config = new KsqlRestConfig(inputProperties); + public void shouldReturnDifferentMapOnEachCallToOriginals() { + // Given: + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10") + .build() + ); final Map originals1 = config.getOriginals(); final Map originals2 = config.getOriginals(); - assertEquals(originals1, originals2); - assertNotSame(originals1, originals2); - assertEquals(COMMIT_INTERVAL_MS, originals1.get(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); - assertEquals(COMMIT_INTERVAL_MS, originals2.get(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); + // When: + originals1.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "99"); + + // Then: + assertThat(originals2.get(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), is("10")); + } + + @Test + public void shouldThrowIfAnyListenerIsInvalidUrl() { + // Expect: + expectedException.expect(ConfigException.class); + expectedException.expectMessage("Invalid value INVALID for configuration " + + LISTENERS_CONFIG + + ": Not valid URL: no protocol: INVALID" + ); + + // Given: + new KsqlRestConfig(ImmutableMap.builder() + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + .put(LISTENERS_CONFIG, "http://localhost:9875,INVALID") + .build() + ); + } + + @Test + public void shouldThrowIfExplicitInterNodeListenerIsInvalidUrl() { + // Expect: + expectedException.expect(ConfigException.class); + expectedException.expectMessage("Invalid value INVALID for configuration " + + INTER_NODE_LISTENER_CONFIG + + ": Not valid URL: no protocol: INVALID" + ); + + // Given: + new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(INTER_NODE_LISTENER_CONFIG, "INVALID") + .build() + ); + } + + @Test + public void shouldUseExplicitInterNodeListenerSetToUnresolvableHost() { + // Given: + final URL expected = url("https://unresolvable.host:12345"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(INTER_NODE_LISTENER_CONFIG, expected.toString()) + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_INTER_NODE_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldUseExplicitInterNodeListenerSetToResolvableHost() { + // Given: + final URL expected = url("https://example.com:12345"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(INTER_NODE_LISTENER_CONFIG, expected.toString()) + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_INTER_NODE_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldUseExplicitInterNodeListenerIfSetToLocalHost() { + // Given: + final URL expected = url("https://localHost:52368"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(INTER_NODE_LISTENER_CONFIG, expected.toString()) + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_INTER_NODE_LISTENER_CONFIG); + verifyLogsLoopBackWarning(expected, QUOTED_INTER_NODE_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldUseExplicitInterNodeListenerIfSetToIpv4Loopback() { + // Given: + final URL expected = url("https://127.0.0.2:12345"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(INTER_NODE_LISTENER_CONFIG, expected.toString()) + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_INTER_NODE_LISTENER_CONFIG); + verifyLogsLoopBackWarning(expected, QUOTED_INTER_NODE_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldUseExplicitInterNodeListenerIfSetToIpv6Loopback() { + // Given: + final URL expected = url("https://[::1]:12345"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(INTER_NODE_LISTENER_CONFIG, expected.toString()) + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_INTER_NODE_LISTENER_CONFIG); + verifyLogsLoopBackWarning(expected, QUOTED_INTER_NODE_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldThrowIfExplicitInterNodeListenerHasAutoPortAssignment() { + // Given: + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(INTER_NODE_LISTENER_CONFIG, "https://unresolvable.host:0") + .build() + ); + + // Expect: + expectedException.expect(ConfigException.class); + expectedException.expectMessage("Invalid value https://unresolvable.host:0 for configuration " + + INTER_NODE_LISTENER_CONFIG + + ": Must have valid port" + ); + + + // When: + config.getInterNodeListener(portResolver, logger); + } + + @Test + public void shouldThrowIfExplicitInterNodeListenerHasIpv4WildcardAddress() { + // Given: + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(INTER_NODE_LISTENER_CONFIG, "https://0.0.0.0:12589") + .build() + ); + + // Expect: + expectedException.expect(ConfigException.class); + expectedException.expectMessage("Invalid value https://0.0.0.0:12589 for configuration " + + INTER_NODE_LISTENER_CONFIG + + ": Can not be wildcard" + ); + + // When: + config.getInterNodeListener(portResolver, logger); + } + + @Test + public void shouldThrowIfExplicitInterNodeListenerHasIpv6WildcardAddress() { + // Given: + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(INTER_NODE_LISTENER_CONFIG, "https://[::]:1236") + .build() + ); + + // Expect: + expectedException.expect(ConfigException.class); + expectedException.expectMessage("Invalid value https://[::]:1236 for configuration " + + INTER_NODE_LISTENER_CONFIG + + ": Can not be wildcard" + ); + + // When: + config.getInterNodeListener(portResolver, logger); + } + + @Test + public void shouldThrowIfOnGetInterNodeListenerIfFirstListenerSetToUnresolvableHost() { + // Given: + final URL expected = url("https://unresolvable.host:12345"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + .put(LISTENERS_CONFIG, expected.toString() + ",http://localhost:2589") + .build() + ); + + // Expect: + expectedException.expect(ConfigException.class); + expectedException.expectMessage("Invalid value " + + "[https://unresolvable.host:12345, http://localhost:2589] for configuration " + + LISTENERS_CONFIG + + ": Could not resolve first host" + ); + + // When: + config.getInterNodeListener(portResolver, logger); + } + + @Test + public void shouldResolveInterNodeListenerToFirstListenerSetToResolvableHost() { + // Given: + final URL expected = url("https://example.com:12345"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + .put(LISTENERS_CONFIG, expected.toString() + ",http://localhost:2589") + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldResolveInterNodeListenerToFirstListenerSetToLocalHost() { + // Given: + final URL expected = url("https://localHost:52368"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + .put(LISTENERS_CONFIG, expected.toString() + ",http://localhost:2589") + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyLogsLoopBackWarning(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldResolveInterNodeListenerToFirstListenerSetToIpv4Loopback() { + // Given: + final URL expected = url("https://127.0.0.2:12345"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + .put(LISTENERS_CONFIG, expected.toString() + ",http://localhost:2589") + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyLogsLoopBackWarning(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldResolveInterNodeListenerToFirstListenerSetToIpv6Loopback() { + // Given: + final URL expected = url("https://[::1]:12345"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + .put(LISTENERS_CONFIG, expected.toString() + ",http://localhost:2589") + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyLogsLoopBackWarning(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldResolveInterNodeListenerToFirstListenerWithAutoPortAssignment() { + // Given: + final URL autoPort = url("https://example.com:0"); + + when(portResolver.apply(any())).thenReturn(2222); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + .put(LISTENERS_CONFIG, autoPort.toString() + ",http://localhost:2589") + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + final URL expected = url("https://example.com:2222"); + + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldResolveInterNodeListenerToFirstListenerWithIpv4WildcardAddress() { + // Given: + final URL wildcard = url("https://0.0.0.0:12589"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + .put(LISTENERS_CONFIG, wildcard.toString() + ",http://localhost:2589") + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + final URL expected = url("https://localhost:12589"); + + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyLogsWildcardWarning(expected); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldResolveInterNodeListenerToFirstListenerWithIpv6WildcardAddress() { + // Given: + final URL wildcard = url("https://[::]:12345"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + .put(LISTENERS_CONFIG, wildcard.toString() + ",http://localhost:2589") + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + final URL expected = url("https://localhost:12345"); + + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyLogsWildcardWarning(expected); + verifyNoMoreInteractions(logger); + } + + private void verifyLogsInterNodeListener(final URL listener, final String sourceConfig) { + verify(logger).info( + "Using {} config for intra-node communication: {}", + sourceConfig, + listener + ); + } + + private void verifyLogsLoopBackWarning(final URL listener, final String sourceConfig) { + verify(logger).warn( + "{} config is set to a loopback address: {}. Intra-node communication will only work " + + "between nodes running on the same machine.", + sourceConfig, + listener + ); + } + + private void verifyLogsWildcardWarning(final URL listener) { + verify(logger).warn( + "{} config uses wildcard address: {}. Intra-node communication will only work " + + "between nodes running on the same machine.", + QUOTED_FIRST_LISTENER_CONFIG, + listener + ); + } + + private static URL url(final String address) { + try { + return new URL(address); + } catch (final MalformedURLException e) { + throw new AssertionError("Invalid URL in test: " + address, e); + } } } From 2941e84d11e1a8e095b0da96ab946c59b81509a9 Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Thu, 19 Dec 2019 11:07:30 +0000 Subject: [PATCH 2/3] chore: requested changes --- config/ksql-production-server.properties | 8 ++-- config/ksql-server.properties | 8 ++-- .../ksql/rest/server/KsqlRestConfig.java | 46 ++++++++++++------- .../rest/server/KsqlRestApplicationTest.java | 2 +- .../ksql/rest/server/KsqlRestConfigTest.java | 30 ++++++------ 5 files changed, 54 insertions(+), 40 deletions(-) diff --git a/config/ksql-production-server.properties b/config/ksql-production-server.properties index 1b47c977d5ab..f4590a5d78e5 100644 --- a/config/ksql-production-server.properties +++ b/config/ksql-production-server.properties @@ -18,15 +18,15 @@ ### HTTP ### # The URL the KSQL server will listen on: # The default is any IPv4 interface on the machine. -# NOTE: If set to wildcard or loopback set 'inter.node.listener' to enable pull queries across machines +# NOTE: If set to wildcard or loopback set 'advertised.listener' to enable pull queries across machines listeners=http://0.0.0.0:8088 # Use the 'listeners' line below for any IPv6 interface on the machine. # listeners=http://[::]:8088 # If running a multi-node cluster across multiple machines and 'listeners' is set to a wildcard or loopback address -# 'inter.node.listener' must be set to the URL other KSQL nodes should use to reach this node. -# inter.node.listener=? +# 'advertised.listener' must be set to the URL other KSQL nodes should use to reach this node. +# advertised.listener=? ### HTTPS ### # To switch KSQL over to communicating using HTTPS comment out the 'listeners' line above @@ -34,7 +34,7 @@ listeners=http://0.0.0.0:8088 # See: https://docs.confluent.io/current/ksql/docs/installation/server-config/security.html#configuring-ksql-cli-for-https # # listeners=https://0.0.0.0:8088 -# inter.node.listener=? +# advertised.listener=? # ssl.keystore.location=? # ssl.keystore.password=? # ssl.key.password=? diff --git a/config/ksql-server.properties b/config/ksql-server.properties index 30e80276644d..1737ca0ba58a 100644 --- a/config/ksql-server.properties +++ b/config/ksql-server.properties @@ -18,15 +18,15 @@ ### HTTP ### # The URL the KSQL server will listen on: # The default is any IPv4 interface on the machine. -# NOTE: If set to wildcard or loopback set 'inter.node.listener' to enable pull queries across machines +# NOTE: If set to wildcard or loopback set 'advertised.listener' to enable pull queries across machines listeners=http://0.0.0.0:8088 # Use the 'listeners' line below for any IPv6 interface on the machine. # listeners=http://[::]:8088 # If running a multi-node cluster across multiple machines and 'listeners' is set to a wildcard or loopback address -# 'inter.node.listener' must be set to the URL other KSQL nodes should use to reach this node. -# inter.node.listener=? +# 'advertised.listener' must be set to the URL other KSQL nodes should use to reach this node. +# advertised.listener=? ### HTTPS ### # To switch KSQL over to communicating using HTTPS comment out the 'listeners' line above @@ -34,7 +34,7 @@ listeners=http://0.0.0.0:8088 # See: https://docs.confluent.io/current/ksql/docs/installation/server-config/security.html#configuring-ksql-cli-for-https # # listeners=https://0.0.0.0:8088 -# inter.node.listener=? +# advertised.listener=? # ssl.keystore.location=? # ssl.keystore.password=? # ssl.key.password=? diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index 56ff566fc40f..398e23bb000b 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -43,21 +43,20 @@ public class KsqlRestConfig extends RestConfig { private static final String KSQL_CONFIG_PREFIX = "ksql."; - private static final String COMMAND_CONSUMER_PREFIX = + private static final String COMMAND_CONSUMER_PREFIX = KSQL_CONFIG_PREFIX + "server.command.consumer."; - private static final String COMMAND_PRODUCER_PREFIX = + private static final String COMMAND_PRODUCER_PREFIX = KSQL_CONFIG_PREFIX + "server.command.producer."; - public static final String INTER_NODE_LISTENER_CONFIG = - KSQL_CONFIG_PREFIX + "inter.node.listener"; - private static final String INTER_NODE_LISTENER_DOC = + public static final String ADVERTISED_LISTENER_CONFIG = + KSQL_CONFIG_PREFIX + "advertised.listener"; + private static final String ADVERTISED_LISTENER_DOC = "The listener used for communication between KSQL nodes in the cluster, if different to the '" + LISTENERS_CONFIG + "' config property. " + "In IaaS environments, this may need to be different from the interface to which " + "the server binds. If this is not set, the first value from listeners will be used. " + "Unlike listeners, it is not valid to use the 0.0.0.0 (IPv4) or [::] (IPv6) " + "wildcard addresses."; - private static final String USE_FIRST_LISTENER = "http://use.first.listener"; static final String STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG = "query.stream.disconnect.check"; @@ -122,12 +121,12 @@ public class KsqlRestConfig extends RestConfig { static { CONFIG_DEF = baseConfigDef().define( - INTER_NODE_LISTENER_CONFIG, + ADVERTISED_LISTENER_CONFIG, Type.STRING, - USE_FIRST_LISTENER, - ConfigValidators.validUrl(), + null, + ConfigValidators.nullsAllowed(ConfigValidators.validUrl()), Importance.HIGH, - INTER_NODE_LISTENER_DOC + ADVERTISED_LISTENER_DOC ).define( STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG, Type.LONG, @@ -247,7 +246,7 @@ URL getInterNodeListener( final Function portResolver, final Logger logger ) { - return getString(INTER_NODE_LISTENER_CONFIG).equals(USE_FIRST_LISTENER) + return getString(ADVERTISED_LISTENER_CONFIG) == null ? getInterNodeListenerFromFirstListener(portResolver, logger) : getInterNodeListenerFromExplicitConfig(logger); } @@ -284,12 +283,12 @@ private URL getInterNodeListenerFromFirstListener( } private URL getInterNodeListenerFromExplicitConfig(final Logger logger) { - final String configValue = getString(INTER_NODE_LISTENER_CONFIG); + final String configValue = getString(ADVERTISED_LISTENER_CONFIG); - final URL listener = parseUrl(configValue, INTER_NODE_LISTENER_CONFIG); + final URL listener = parseUrl(configValue, ADVERTISED_LISTENER_CONFIG); if (listener.getPort() <= 0) { - throw new ConfigException(INTER_NODE_LISTENER_CONFIG, configValue, "Must have valid port"); + throw new ConfigException(ADVERTISED_LISTENER_CONFIG, configValue, "Must have valid port"); } // Valid for address to not be resolvable, as may be _externally_ resolvable: @@ -297,7 +296,7 @@ private URL getInterNodeListenerFromExplicitConfig(final Logger logger) { address.ifPresent(a -> { if (a.isAnyLocalAddress()) { - throw new ConfigException(INTER_NODE_LISTENER_CONFIG, configValue, "Can not be wildcard"); + throw new ConfigException(ADVERTISED_LISTENER_CONFIG, configValue, "Can not be wildcard"); } }); @@ -305,7 +304,7 @@ private URL getInterNodeListenerFromExplicitConfig(final Logger logger) { logger, listener, address, - "'" + INTER_NODE_LISTENER_CONFIG + "'" + "'" + ADVERTISED_LISTENER_CONFIG + "'" ); return listener; @@ -338,6 +337,21 @@ private static void logInterNodeListener( logger.info("Using {} config for intra-node communication: {}", sourceConfigName, listener); } + /** + * Used to sanitize the first `listener` config. + * + *

It will: + *

    + *
  • resolve any auto-port assignment to the actual port the server is listening on
  • + *
  • potentially, replace the host with localhost. This can be useful where the first + * listener is a wildcard address, e.g. {@code 0.0.0.0}/li> + *
+ * + * @param listener the URL to sanitize + * @param portResolver the function to call to resolve the port. + * @param replaceHost flag indicating if the host in the URL should be replaced with localhost. + * @return the sanitized URL. + */ private static URL sanitizeInterNodeListener( final URL listener, final Function portResolver, diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 9da5dc5b89e1..cedc84acb5e2 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -356,7 +356,7 @@ public void shouldConfigureIQWithInterNodeListenerIfSet() { // Given: givenAppWithRestConfig(ImmutableMap.of( RestConfig.LISTENERS_CONFIG, "http://localhost:0", - KsqlRestConfig.INTER_NODE_LISTENER_CONFIG, "https://some.host:12345" + KsqlRestConfig.ADVERTISED_LISTENER_CONFIG, "https://some.host:12345" )); // When: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java index 2884c550190e..a56d3e3048d6 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java @@ -16,7 +16,7 @@ package io.confluent.ksql.rest.server; -import static io.confluent.ksql.rest.server.KsqlRestConfig.INTER_NODE_LISTENER_CONFIG; +import static io.confluent.ksql.rest.server.KsqlRestConfig.ADVERTISED_LISTENER_CONFIG; import static io.confluent.rest.RestConfig.LISTENERS_CONFIG; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; @@ -51,7 +51,7 @@ public class KsqlRestConfigTest { .build(); private static final String QUOTED_INTER_NODE_LISTENER_CONFIG = - "'" + INTER_NODE_LISTENER_CONFIG + "'"; + "'" + ADVERTISED_LISTENER_CONFIG + "'"; private static final String QUOTED_FIRST_LISTENER_CONFIG = "first '" + LISTENERS_CONFIG + "'"; @@ -129,14 +129,14 @@ public void shouldThrowIfExplicitInterNodeListenerIsInvalidUrl() { // Expect: expectedException.expect(ConfigException.class); expectedException.expectMessage("Invalid value INVALID for configuration " - + INTER_NODE_LISTENER_CONFIG + + ADVERTISED_LISTENER_CONFIG + ": Not valid URL: no protocol: INVALID" ); // Given: new KsqlRestConfig(ImmutableMap.builder() .putAll(MIN_VALID_CONFIGS) - .put(INTER_NODE_LISTENER_CONFIG, "INVALID") + .put(ADVERTISED_LISTENER_CONFIG, "INVALID") .build() ); } @@ -148,7 +148,7 @@ public void shouldUseExplicitInterNodeListenerSetToUnresolvableHost() { final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() .putAll(MIN_VALID_CONFIGS) - .put(INTER_NODE_LISTENER_CONFIG, expected.toString()) + .put(ADVERTISED_LISTENER_CONFIG, expected.toString()) .build() ); @@ -168,7 +168,7 @@ public void shouldUseExplicitInterNodeListenerSetToResolvableHost() { final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() .putAll(MIN_VALID_CONFIGS) - .put(INTER_NODE_LISTENER_CONFIG, expected.toString()) + .put(ADVERTISED_LISTENER_CONFIG, expected.toString()) .build() ); @@ -188,7 +188,7 @@ public void shouldUseExplicitInterNodeListenerIfSetToLocalHost() { final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() .putAll(MIN_VALID_CONFIGS) - .put(INTER_NODE_LISTENER_CONFIG, expected.toString()) + .put(ADVERTISED_LISTENER_CONFIG, expected.toString()) .build() ); @@ -209,7 +209,7 @@ public void shouldUseExplicitInterNodeListenerIfSetToIpv4Loopback() { final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() .putAll(MIN_VALID_CONFIGS) - .put(INTER_NODE_LISTENER_CONFIG, expected.toString()) + .put(ADVERTISED_LISTENER_CONFIG, expected.toString()) .build() ); @@ -230,7 +230,7 @@ public void shouldUseExplicitInterNodeListenerIfSetToIpv6Loopback() { final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() .putAll(MIN_VALID_CONFIGS) - .put(INTER_NODE_LISTENER_CONFIG, expected.toString()) + .put(ADVERTISED_LISTENER_CONFIG, expected.toString()) .build() ); @@ -249,14 +249,14 @@ public void shouldThrowIfExplicitInterNodeListenerHasAutoPortAssignment() { // Given: final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() .putAll(MIN_VALID_CONFIGS) - .put(INTER_NODE_LISTENER_CONFIG, "https://unresolvable.host:0") + .put(ADVERTISED_LISTENER_CONFIG, "https://unresolvable.host:0") .build() ); // Expect: expectedException.expect(ConfigException.class); expectedException.expectMessage("Invalid value https://unresolvable.host:0 for configuration " - + INTER_NODE_LISTENER_CONFIG + + ADVERTISED_LISTENER_CONFIG + ": Must have valid port" ); @@ -270,14 +270,14 @@ public void shouldThrowIfExplicitInterNodeListenerHasIpv4WildcardAddress() { // Given: final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() .putAll(MIN_VALID_CONFIGS) - .put(INTER_NODE_LISTENER_CONFIG, "https://0.0.0.0:12589") + .put(ADVERTISED_LISTENER_CONFIG, "https://0.0.0.0:12589") .build() ); // Expect: expectedException.expect(ConfigException.class); expectedException.expectMessage("Invalid value https://0.0.0.0:12589 for configuration " - + INTER_NODE_LISTENER_CONFIG + + ADVERTISED_LISTENER_CONFIG + ": Can not be wildcard" ); @@ -290,14 +290,14 @@ public void shouldThrowIfExplicitInterNodeListenerHasIpv6WildcardAddress() { // Given: final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() .putAll(MIN_VALID_CONFIGS) - .put(INTER_NODE_LISTENER_CONFIG, "https://[::]:1236") + .put(ADVERTISED_LISTENER_CONFIG, "https://[::]:1236") .build() ); // Expect: expectedException.expect(ConfigException.class); expectedException.expectMessage("Invalid value https://[::]:1236 for configuration " - + INTER_NODE_LISTENER_CONFIG + + ADVERTISED_LISTENER_CONFIG + ": Can not be wildcard" ); From cf8b087606ee0b062372f834e95a75c4364bbdf3 Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Thu, 19 Dec 2019 12:58:15 +0000 Subject: [PATCH 3/3] chore: fix tests --- .../server/KsqlRestApplicationFunctionalTest.java | 2 +- .../ksql/rest/server/KsqlRestConfigTest.java | 14 ++++++++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationFunctionalTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationFunctionalTest.java index 4703dbb0d10d..6f44fb7f4e38 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationFunctionalTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationFunctionalTest.java @@ -62,7 +62,7 @@ public void shouldResolveListenersWithExplicitAndAutoAssignedPorts() { assertThat(listeners.get(0).getHost(), is("127.0.0.1")); assertThat(listeners.get(0).getPort(), is(not(0))); assertThat(listeners.get(1).getProtocol(), is("https")); - assertThat(listeners.get(1).getHost(), is("localhost")); + assertThat(listeners.get(1).getHost(), is("localHost")); assertThat(listeners.get(1).getPort(), is(8088)); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java index a56d3e3048d6..f16e20d6133e 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java @@ -27,8 +27,10 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.util.KsqlConfig; +import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URL; +import java.net.UnknownHostException; import java.util.Map; import java.util.function.Function; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -450,7 +452,7 @@ public void shouldResolveInterNodeListenerToFirstListenerWithIpv4WildcardAddress final URL actual = config.getInterNodeListener(portResolver, logger); // Then: - final URL expected = url("https://localhost:12589"); + final URL expected = url("https://" + getLocalHostName() + ":12589"); assertThat(actual, is(expected)); verifyLogsInterNodeListener(expected, QUOTED_FIRST_LISTENER_CONFIG); @@ -473,7 +475,7 @@ public void shouldResolveInterNodeListenerToFirstListenerWithIpv6WildcardAddress final URL actual = config.getInterNodeListener(portResolver, logger); // Then: - final URL expected = url("https://localhost:12345"); + final URL expected = url("https://" + getLocalHostName() + ":12345"); assertThat(actual, is(expected)); verifyLogsInterNodeListener(expected, QUOTED_FIRST_LISTENER_CONFIG); @@ -514,4 +516,12 @@ private static URL url(final String address) { throw new AssertionError("Invalid URL in test: " + address, e); } } + + private static String getLocalHostName() { + try { + return InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + throw new AssertionError("Failed to obtain local host info", e); + } + } }