diff --git a/config/ksql-production-server.properties b/config/ksql-production-server.properties index 1c3943aa67d1..f4590a5d78e5 100644 --- a/config/ksql-production-server.properties +++ b/config/ksql-production-server.properties @@ -18,17 +18,23 @@ ### HTTP ### # The URL the KSQL server will listen on: # The default is any IPv4 interface on the machine. +# NOTE: If set to wildcard or loopback set 'advertised.listener' to enable pull queries across machines listeners=http://0.0.0.0:8088 # Use the 'listeners' line below for any IPv6 interface on the machine. # listeners=http://[::]:8088 +# If running a multi-node cluster across multiple machines and 'listeners' is set to a wildcard or loopback address +# 'advertised.listener' must be set to the URL other KSQL nodes should use to reach this node. +# advertised.listener=? + ### HTTPS ### # To switch KSQL over to communicating using HTTPS comment out the 'listeners' line above # uncomment and complete the properties below. # See: https://docs.confluent.io/current/ksql/docs/installation/server-config/security.html#configuring-ksql-cli-for-https # # listeners=https://0.0.0.0:8088 +# advertised.listener=? # ssl.keystore.location=? # ssl.keystore.password=? # ssl.key.password=? diff --git a/config/ksql-server.properties b/config/ksql-server.properties index f11b09d8e24e..1737ca0ba58a 100644 --- a/config/ksql-server.properties +++ b/config/ksql-server.properties @@ -18,17 +18,23 @@ ### HTTP ### # The URL the KSQL server will listen on: # The default is any IPv4 interface on the machine. +# NOTE: If set to wildcard or loopback set 'advertised.listener' to enable pull queries across machines listeners=http://0.0.0.0:8088 # Use the 'listeners' line below for any IPv6 interface on the machine. # listeners=http://[::]:8088 +# If running a multi-node cluster across multiple machines and 'listeners' is set to a wildcard or loopback address +# 'advertised.listener' must be set to the URL other KSQL nodes should use to reach this node. +# advertised.listener=? + ### HTTPS ### # To switch KSQL over to communicating using HTTPS comment out the 'listeners' line above # uncomment and complete the properties below. # See: https://docs.confluent.io/current/ksql/docs/installation/server-config/security.html#configuring-ksql-cli-for-https # # listeners=https://0.0.0.0:8088 +# advertised.listener=? # ssl.keystore.location=? # ssl.keystore.password=? # ssl.key.password=? diff --git a/ksql-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java b/ksql-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java index db8075ef6f1a..3db0708f0bbd 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java +++ b/ksql-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java @@ -17,6 +17,7 @@ import static java.util.Objects.requireNonNull; +import java.net.URL; import java.util.Arrays; import java.util.EnumSet; import java.util.List; @@ -78,6 +79,19 @@ public static > Validator enumValues(final Class enumClass) return ValidCaseInsensitiveString.in(validValues); } + public static Validator validUrl() { + return (name, val) -> { + if (!(val instanceof String)) { + throw new IllegalArgumentException("validator should only be used with STRING defs"); + } + try { + new URL((String)val); + } catch (Exception e) { + throw new ConfigException(name, val, "Not valid URL: " + e.getMessage()); + } + }; + } + public static final class ValidCaseInsensitiveString implements Validator { private final List validStrings; diff --git a/ksql-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java b/ksql-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java index 52d2c5180796..b635e9a72f86 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java @@ -182,6 +182,31 @@ public void shouldThrowIfParserThrows() { validator.ensureValid("propName", "value"); } + @Test + public void shouldThrowOnInvalidURL() { + // Given: + final Validator validator = ConfigValidators.validUrl(); + + // Then: + expectedException.expect(ConfigException.class); + expectedException.expectMessage( + "Invalid value INVALID for configuration propName: Not valid URL: no protocol: INVALID"); + + // When: + validator.ensureValid("propName", "INVALID"); + } + + @Test + public void shouldNotThrowOnValidURL() { + // Given: + final Validator validator = ConfigValidators.validUrl(); + + // When: + validator.ensureValid("propName", "http://valid:25896/somePath"); + + // Then: did not throw. + } + private enum TestEnum { FOO, BAR } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/util/AvroUtilTest.java b/ksql-engine/src/test/java/io/confluent/ksql/util/AvroUtilTest.java index d90aa65f18f3..94193b495cf1 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/util/AvroUtilTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/util/AvroUtilTest.java @@ -48,7 +48,6 @@ import java.io.IOException; import java.util.Collections; import java.util.Optional; - import org.apache.avro.Schema; import org.junit.Before; import org.junit.Rule; diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 24bff5b12850..549293a091c2 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -16,6 +16,7 @@ package io.confluent.ksql.rest.server; import static io.confluent.ksql.rest.server.KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG; +import static io.confluent.rest.RestConfig.LISTENERS_CONFIG; import static java.util.Objects.requireNonNull; import com.fasterxml.jackson.jaxrs.base.JsonParseExceptionMapper; @@ -77,6 +78,7 @@ import io.confluent.ksql.services.SimpleKsqlClient; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlServerException; import io.confluent.ksql.util.RetryUtil; import io.confluent.ksql.util.Version; import io.confluent.ksql.util.WelcomeMsgUtils; @@ -87,6 +89,7 @@ import java.io.Console; import java.io.OutputStreamWriter; import java.io.PrintWriter; +import java.net.MalformedURLException; import java.net.URI; import java.net.URL; import java.nio.charset.StandardCharsets; @@ -106,6 +109,7 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.websocket.DeploymentException; import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpointConfig; @@ -213,7 +217,7 @@ public void setupResources(final Configurable config, final KsqlRestConfig ap } @Override - public void startAsync() throws Exception { + public void startAsync() { log.info("KSQL RESTful API listening on {}", StringUtils.join(getListeners(), ", ")); final KsqlConfig ksqlConfigWithPort = buildConfigWithPort(); configurables.forEach(c -> c.configure(ksqlConfigWithPort)); @@ -323,23 +327,46 @@ public void onShutdown() { } List getListeners() { - return Arrays.stream(server.getConnectors()) - .filter(connector -> connector instanceof ServerConnector) - .map(ServerConnector.class::cast) - .map(connector -> { - try { - final String protocol = new HashSet<>(connector.getProtocols()) - .stream() - .map(String::toLowerCase) - .anyMatch(s -> s.equals("ssl")) ? "https" : "http"; - - final int localPort = connector.getLocalPort(); - - return new URL(protocol, "localhost", localPort, ""); - } catch (final Exception e) { - throw new RuntimeException("Malformed listener", e); - } - }) + final Function> resolvePort = url -> + Arrays.stream(server.getConnectors()) + .filter(connector -> connector instanceof ServerConnector) + .map(ServerConnector.class::cast) + .filter(connector -> { + final String connectorProtocol = connector.getProtocols().stream() + .map(String::toLowerCase) + .anyMatch(p -> p.equals("ssl")) ? "https" : "http"; + + return connectorProtocol.equalsIgnoreCase(url.getProtocol()); + }) + .map(ServerConnector::getLocalPort) + .collect(Collectors.toSet()); + + final Function> resolveUrl = listener -> { + try { + final URL url = new URL(listener); + if (url.getPort() != 0) { + return Stream.of(url); + } + + // Need to resolve port using actual listeners: + return resolvePort.apply(url).stream() + .map(port -> { + try { + return new URL(url.getProtocol(), url.getHost(), port, url.getFile()); + } catch (MalformedURLException e) { + throw new KsqlServerException("Malformed URL specified in '" + + LISTENERS_CONFIG + "' config: " + listener, e); + } + }); + } catch (MalformedURLException e) { + throw new KsqlServerException("Malformed URL specified in '" + + LISTENERS_CONFIG + "' config: " + listener, e); + } + }; + + return restConfig.getList(LISTENERS_CONFIG).stream() + .flatMap(resolveUrl) + .distinct() .collect(Collectors.toList()); } @@ -681,21 +708,31 @@ private static void maybeCreateProcessingLogStream( * * @return true server config. */ - private KsqlConfig buildConfigWithPort() { + @VisibleForTesting + KsqlConfig buildConfigWithPort() { final Map props = ksqlConfigNoPort.originals(); - // Wire up KS IQ endpoint discovery to the FIRST listener: - final URL firstListener = getListeners().get(0); + // Wire up KS IQ so that pull queries work across KSQL nodes: props.put( KsqlConfig.KSQL_STREAMS_PREFIX + StreamsConfig.APPLICATION_SERVER_CONFIG, - firstListener.toString() + restConfig.getInterNodeListener(this::resolvePort).toString() ); - log.info("Using first listener URL for intra-node communication: {}", firstListener); - return new KsqlConfig(props); } + private int resolvePort(final URL listener) { + return getListeners().stream() + .filter(l -> + l.getProtocol().equals(listener.getProtocol()) + && l.getHost().equals(listener.getHost()) + ) + .map(URL::getPort) + .findFirst() + .orElseThrow(() -> + new IllegalStateException("Failed resolve port for listener: " + listener)); + } + private static KsqlRestConfig injectPathsWithoutAuthentication(final KsqlRestConfig restConfig) { final Set authenticationSkipPaths = new HashSet<>( restConfig.getList(RestConfig.AUTHENTICATION_SKIP_PATHS) diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index e1971e44e749..398e23bb000b 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -15,24 +15,49 @@ package io.confluent.ksql.rest.server; +import com.google.common.annotations.VisibleForTesting; +import io.confluent.ksql.configdef.ConfigValidators; import io.confluent.ksql.rest.DefaultErrorMessages; import io.confluent.ksql.rest.ErrorMessages; import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.KsqlServerException; import io.confluent.rest.RestConfig; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.UnknownHostException; +import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.function.Function; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KsqlRestConfig extends RestConfig { + private static final Logger LOGGER = LoggerFactory.getLogger(KsqlRestConfig.class); + private static final String KSQL_CONFIG_PREFIX = "ksql."; - private static final String COMMAND_CONSUMER_PREFIX = + private static final String COMMAND_CONSUMER_PREFIX = KSQL_CONFIG_PREFIX + "server.command.consumer."; - private static final String COMMAND_PRODUCER_PREFIX = + private static final String COMMAND_PRODUCER_PREFIX = KSQL_CONFIG_PREFIX + "server.command.producer."; + public static final String ADVERTISED_LISTENER_CONFIG = + KSQL_CONFIG_PREFIX + "advertised.listener"; + private static final String ADVERTISED_LISTENER_DOC = + "The listener used for communication between KSQL nodes in the cluster, if different to the '" + + LISTENERS_CONFIG + "' config property. " + + "In IaaS environments, this may need to be different from the interface to which " + + "the server binds. If this is not set, the first value from listeners will be used. " + + "Unlike listeners, it is not valid to use the 0.0.0.0 (IPv4) or [::] (IPv6) " + + "wildcard addresses."; + static final String STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG = "query.stream.disconnect.check"; @@ -96,6 +121,13 @@ public class KsqlRestConfig extends RestConfig { static { CONFIG_DEF = baseConfigDef().define( + ADVERTISED_LISTENER_CONFIG, + Type.STRING, + null, + ConfigValidators.nullsAllowed(ConfigValidators.validUrl()), + Importance.HIGH, + ADVERTISED_LISTENER_DOC + ).define( STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG, Type.LONG, 1000L, @@ -155,10 +187,14 @@ public class KsqlRestConfig extends RestConfig { public KsqlRestConfig(final Map props) { super(CONFIG_DEF, props); - if (getList(RestConfig.LISTENERS_CONFIG).isEmpty()) { - throw new KsqlException(RestConfig.LISTENERS_CONFIG + " must be supplied. " - + RestConfig.LISTENERS_DOC); + + final List listeners = getList(LISTENERS_CONFIG); + if (listeners.isEmpty()) { + throw new KsqlException(LISTENERS_CONFIG + " must be supplied. " + LISTENERS_DOC); } + + listeners + .forEach(listener -> ConfigValidators.validUrl().ensureValid(LISTENERS_CONFIG, listener)); } // Bit of a hack to get around the fact that RestConfig.originals() is private for some reason @@ -183,4 +219,185 @@ Map getCommandProducerProperties() { public Map getKsqlConfigProperties() { return getOriginals(); } + + /** + * Determines which URL should be used to contact this node from other KSQL nodes. + * + *

Uses {@code INTER_NODE_LISTENER_CONFIG} by default, or first listener defined in + * {@code LISTENERS_CONFIG} if {@code INTER_NODE_LISTENER_CONFIG} not set. + * + *

Method takes a {@code portResolver} to resolve any auto-assigned port in + * {@code LISTENERS_CONFIG}, (i.e. port {@code 0}. + * + *

Any loopback or localhost in {@code LISTENERS_CONFIG} will be replaced with local machine + * name, though this is not guarenteed to work across machines in all circumstance. + * + * @param portResolver called to resolve port in first {@code LISTENERS_CONFIG} if {@code 0}. + * @return the resolved inter-node endpoint to use. + */ + public URL getInterNodeListener( + final Function portResolver + ) { + return getInterNodeListener(portResolver, LOGGER); + } + + @VisibleForTesting + URL getInterNodeListener( + final Function portResolver, + final Logger logger + ) { + return getString(ADVERTISED_LISTENER_CONFIG) == null + ? getInterNodeListenerFromFirstListener(portResolver, logger) + : getInterNodeListenerFromExplicitConfig(logger); + } + + private URL getInterNodeListenerFromFirstListener( + final Function portResolver, + final Logger logger + ) { + final List configValue = getList(LISTENERS_CONFIG); + + final URL firstListener = parseUrl(configValue.get(0), LISTENERS_CONFIG); + + final InetAddress address = parseInetAddress(firstListener.getHost()) + .orElseThrow(() -> new ConfigException( + LISTENERS_CONFIG, + configValue, + "Could not resolve first host" + )); + + final URL listener = sanitizeInterNodeListener( + firstListener, + portResolver, + address.isAnyLocalAddress() + ); + + logInterNodeListener( + logger, + listener, + Optional.of(address), + "first '" + LISTENERS_CONFIG + "'" + ); + + return listener; + } + + private URL getInterNodeListenerFromExplicitConfig(final Logger logger) { + final String configValue = getString(ADVERTISED_LISTENER_CONFIG); + + final URL listener = parseUrl(configValue, ADVERTISED_LISTENER_CONFIG); + + if (listener.getPort() <= 0) { + throw new ConfigException(ADVERTISED_LISTENER_CONFIG, configValue, "Must have valid port"); + } + + // Valid for address to not be resolvable, as may be _externally_ resolvable: + final Optional address = parseInetAddress(listener.getHost()); + + address.ifPresent(a -> { + if (a.isAnyLocalAddress()) { + throw new ConfigException(ADVERTISED_LISTENER_CONFIG, configValue, "Can not be wildcard"); + } + }); + + logInterNodeListener( + logger, + listener, + address, + "'" + ADVERTISED_LISTENER_CONFIG + "'" + ); + + return listener; + } + + private static void logInterNodeListener( + final Logger logger, + final URL listener, + final Optional address, + final String sourceConfigName + ) { + address.ifPresent(a -> { + if (a.isLoopbackAddress()) { + logger.warn( + "{} config is set to a loopback address: {}. Intra-node communication will only work " + + "between nodes running on the same machine.", + sourceConfigName, listener + ); + } + + if (a.isAnyLocalAddress()) { + logger.warn( + "{} config uses wildcard address: {}. Intra-node communication will only work " + + "between nodes running on the same machine.", + sourceConfigName, listener + ); + } + }); + + logger.info("Using {} config for intra-node communication: {}", sourceConfigName, listener); + } + + /** + * Used to sanitize the first `listener` config. + * + *

It will: + *

    + *
  • resolve any auto-port assignment to the actual port the server is listening on
  • + *
  • potentially, replace the host with localhost. This can be useful where the first + * listener is a wildcard address, e.g. {@code 0.0.0.0}/li> + *
+ * + * @param listener the URL to sanitize + * @param portResolver the function to call to resolve the port. + * @param replaceHost flag indicating if the host in the URL should be replaced with localhost. + * @return the sanitized URL. + */ + private static URL sanitizeInterNodeListener( + final URL listener, + final Function portResolver, + final boolean replaceHost + ) { + if (!replaceHost && listener.getPort() > 0) { + return listener; + } + + final String host = replaceHost + ? getLocalHostName() + : listener.getHost(); + + final int port = listener.getPort() == 0 + ? portResolver.apply(listener) + : listener.getPort(); + + try { + return new URL(listener.getProtocol(), host, port, listener.getFile()); + } catch (final MalformedURLException e) { + throw new KsqlServerException("Resolved first listener to malformed URL", e); + } + } + + private static URL parseUrl(final String address, final String configName) { + try { + return new URL(address); + } catch (final MalformedURLException e) { + throw new ConfigException(configName, address, e.getMessage()); + } + } + + + private static Optional parseInetAddress(final String address) { + try { + return Optional.of(InetAddress.getByName(address)); + } catch (final UnknownHostException e) { + return Optional.empty(); + } + } + + private static String getLocalHostName() { + try { + return InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + throw new KsqlServerException("Failed to obtain local host info", e); + } + } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationFunctionalTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationFunctionalTest.java new file mode 100644 index 000000000000..6f44fb7f4e38 --- /dev/null +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationFunctionalTest.java @@ -0,0 +1,107 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; + +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster; +import io.confluent.ksql.version.metrics.KsqlVersionCheckerAgent; +import io.confluent.rest.RestConfig; +import java.net.URL; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.CommonClientConfigs; +import org.junit.After; +import org.junit.ClassRule; +import org.junit.Test; + +public class KsqlRestApplicationFunctionalTest { + + @ClassRule + public static final EmbeddedSingleNodeKafkaCluster CLUSTER = + EmbeddedSingleNodeKafkaCluster.build(); + + private KsqlRestApplication app; + + @After + public void tearDown() throws Exception { + if (app != null) { + app.stop(); + } + } + + @Test + public void shouldResolveListenersWithExplicitAndAutoAssignedPorts() { + // Given: + givenAppStartedWith(ImmutableMap + .of(RestConfig.LISTENERS_CONFIG, "http://127.0.0.1:0,https://localHost:8088")); + + // When: + final List listeners = app.getListeners(); + + // Then: + assertThat(listeners, hasSize(2)); + assertThat(listeners.get(0).getProtocol(), is("http")); + assertThat(listeners.get(0).getHost(), is("127.0.0.1")); + assertThat(listeners.get(0).getPort(), is(not(0))); + assertThat(listeners.get(1).getProtocol(), is("https")); + assertThat(listeners.get(1).getHost(), is("localHost")); + assertThat(listeners.get(1).getPort(), is(8088)); + } + + @Test + public void shouldResolveMultipleListenersPerProtocol() { + // Given: + givenAppStartedWith(ImmutableMap + .of(RestConfig.LISTENERS_CONFIG, "http://localhost:0,http://localhost:0")); + + // When: + final List listeners = app.getListeners(); + + // Then: + assertThat(listeners, hasSize(2)); + assertThat(listeners.get(0).getProtocol(), is("http")); + assertThat(listeners.get(0).getHost(), is("localhost")); + assertThat(listeners.get(0).getPort(), is(not(0))); + assertThat(listeners.get(1).getProtocol(), is("http")); + assertThat(listeners.get(1).getHost(), is("localhost")); + assertThat(listeners.get(1).getPort(), is(not(0))); + } + + private void givenAppStartedWith(final Map config) { + + final KsqlRestConfig restConfig = new KsqlRestConfig(ImmutableMap.builder() + .putAll(config) + .put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()) + .build() + ); + + app = KsqlRestApplication.buildApplication( + restConfig, + KsqlVersionCheckerAgent::new + ); + + try { + app.start(); + } catch (Exception e) { + throw new AssertionError("Failed to start", e); + } + } +} \ No newline at end of file diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 712fa7fe61bd..cedc84acb5e2 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -15,6 +15,8 @@ package io.confluent.ksql.rest.server; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyMap; @@ -28,20 +30,16 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; -import io.confluent.ksql.KsqlExecutionContext; +import com.google.common.collect.ImmutableMap; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.logging.processing.ProcessingLogConfig; import io.confluent.ksql.logging.processing.ProcessingLogContext; -import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; -import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.KsqlRequest; -import io.confluent.ksql.rest.server.computation.Command; import io.confluent.ksql.rest.server.computation.CommandRunner; import io.confluent.ksql.rest.server.computation.CommandStore; -import io.confluent.ksql.rest.server.computation.QueuedCommandStatus; import io.confluent.ksql.rest.server.context.KsqlRestServiceContextBinder; import io.confluent.ksql.rest.server.filters.KsqlAuthorizationFilter; import io.confluent.ksql.rest.server.resources.KsqlResource; @@ -59,13 +57,16 @@ import io.confluent.rest.RestConfig; import java.util.Collections; import java.util.LinkedList; +import java.util.Map; import java.util.Optional; import java.util.Queue; import java.util.function.Consumer; import javax.ws.rs.core.Configurable; -import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.streams.StreamsConfig; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.InOrder; import org.mockito.Mock; @@ -78,24 +79,16 @@ public class KsqlRestApplicationTest { private static final String LOG_TOPIC_NAME = "log_topic"; private static final String CMD_TOPIC_NAME = "command_topic"; - private final KsqlRestConfig restConfig = - new KsqlRestConfig( - Collections.singletonMap(RestConfig.LISTENERS_CONFIG, - "http://localhost:8088")); + @Rule + public final ExpectedException expectedException = ExpectedException.none(); @Mock private ServiceContext serviceContext; @Mock private KsqlEngine ksqlEngine; @Mock - private MetaStore metaStore; - @Mock - private KsqlExecutionContext sandBox; - @Mock private KsqlConfig ksqlConfig; @Mock - private KsqlRestConfig ksqlRestConfig; - @Mock private ProcessingLogConfig processingLogConfig; @Mock private CommandRunner commandRunner; @@ -112,8 +105,6 @@ public class KsqlRestApplicationTest { @Mock private CommandStore commandQueue; @Mock - private QueuedCommandStatus queuedCommandStatus; - @Mock private KsqlSecurityExtension securityExtension; @Mock private ProcessingLogContext processingLogContext; @@ -131,10 +122,10 @@ public class KsqlRestApplicationTest { private PreparedStatement preparedStatement; @Mock private Consumer rocksDBConfigSetterHandler; - @Mock - private Producer transactionalProducer; + private String logCreateStatement; private KsqlRestApplication app; + private KsqlRestConfig restConfig; @SuppressWarnings("unchecked") @Before @@ -163,26 +154,7 @@ public void setUp() { ksqlConfig ); - app = new KsqlRestApplication( - serviceContext, - ksqlEngine, - ksqlConfig, - restConfig, - commandRunner, - commandQueue, - rootDocument, - statusResource, - streamedQueryResource, - ksqlResource, - versionCheckerAgent, - KsqlRestServiceContextBinder::new, - securityExtension, - serverState, - processingLogContext, - ImmutableList.of(precondition1, precondition2), - ImmutableList.of(ksqlResource, streamedQueryResource), - rocksDBConfigSetterHandler - ); + givenAppWithRestConfig(ImmutableMap.of(RestConfig.LISTENERS_CONFIG, "http://localhost:0")); } @Test @@ -378,4 +350,65 @@ public void shouldConfigureRocksDBConfigSetter() { // Then: verify(rocksDBConfigSetterHandler).accept(ksqlConfig); } + + @Test + public void shouldConfigureIQWithInterNodeListenerIfSet() { + // Given: + givenAppWithRestConfig(ImmutableMap.of( + RestConfig.LISTENERS_CONFIG, "http://localhost:0", + KsqlRestConfig.ADVERTISED_LISTENER_CONFIG, "https://some.host:12345" + )); + + // When: + final KsqlConfig ksqlConfig = app.buildConfigWithPort(); + + // Then: + assertThat( + ksqlConfig.getKsqlStreamConfigProps().get(StreamsConfig.APPLICATION_SERVER_CONFIG), + is("https://some.host:12345") + ); + } + + @Test + public void shouldConfigureIQWithFirstListenerIfInterNodeNotSet() { + // Given: + givenAppWithRestConfig(ImmutableMap.of( + RestConfig.LISTENERS_CONFIG, "http://some.host:1244,https://some.other.host:1258" + )); + + // When: + final KsqlConfig ksqlConfig = app.buildConfigWithPort(); + + // Then: + assertThat( + ksqlConfig.getKsqlStreamConfigProps().get(StreamsConfig.APPLICATION_SERVER_CONFIG), + is("http://some.host:1244") + ); + } + + private void givenAppWithRestConfig(final Map restConfigMap) { + + restConfig = new KsqlRestConfig(restConfigMap); + + app = new KsqlRestApplication( + serviceContext, + ksqlEngine, + ksqlConfig, + restConfig, + commandRunner, + commandQueue, + rootDocument, + statusResource, + streamedQueryResource, + ksqlResource, + versionCheckerAgent, + KsqlRestServiceContextBinder::new, + securityExtension, + serverState, + processingLogContext, + ImmutableList.of(precondition1, precondition2), + ImmutableList.of(ksqlResource, streamedQueryResource), + rocksDBConfigSetterHandler + ); + } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java index ce0249253740..f16e20d6133e 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java @@ -16,60 +16,512 @@ package io.confluent.ksql.rest.server; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; +import static io.confluent.ksql.rest.server.KsqlRestConfig.ADVERTISED_LISTENER_CONFIG; +import static io.confluent.rest.RestConfig.LISTENERS_CONFIG; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.util.KsqlConfig; -import io.confluent.rest.RestConfig; -import java.util.HashMap; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.UnknownHostException; import java.util.Map; +import java.util.function.Function; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.streams.StreamsConfig; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.Logger; +@RunWith(MockitoJUnitRunner.class) public class KsqlRestConfigTest { - private static final Map MIN_VALID_CONFIGS = ImmutableMap.builder() + private static final Map MIN_VALID_CONFIGS = ImmutableMap.builder() .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") - .put(RestConfig.LISTENERS_CONFIG, "http://localhost:8088") + .put(LISTENERS_CONFIG, "http://localhost:8088") .build(); - @Test - public void testGetKsqlConfigProperties() { - final Map inputProperties = new HashMap<>(MIN_VALID_CONFIGS); - inputProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - inputProperties.put(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "test"); + private static final String QUOTED_INTER_NODE_LISTENER_CONFIG = + "'" + ADVERTISED_LISTENER_CONFIG + "'"; + + private static final String QUOTED_FIRST_LISTENER_CONFIG = + "first '" + LISTENERS_CONFIG + "'"; + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Mock + private Function portResolver; + @Mock + private Logger logger; - final KsqlRestConfig config = new KsqlRestConfig(inputProperties); + @Test + public void shouldGetKsqlConfigProperties() { + // Given: + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .put(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "test") + .build() + ); + // When: final Map ksqlConfigProperties = config.getKsqlConfigProperties(); - final Map expectedKsqlConfigProperties = new HashMap<>(); - expectedKsqlConfigProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - expectedKsqlConfigProperties.put(RestConfig.LISTENERS_CONFIG, "http://localhost:8088"); - expectedKsqlConfigProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - expectedKsqlConfigProperties.put(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "test"); - assertThat(ksqlConfigProperties, equalTo(expectedKsqlConfigProperties)); + + // Then: + assertThat(ksqlConfigProperties, is(ImmutableMap.of( + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092", + LISTENERS_CONFIG, "http://localhost:8088", + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest", + KsqlConfig.KSQL_SERVICE_ID_CONFIG, "test")) + ); } // Just a sanity check to make sure that, although they contain identical mappings, successive maps returned by calls // to KsqlRestConfig.getOriginals() do not actually return the same object (mutability would then be an issue) @Test - public void testOriginalsReplicability() { - final String COMMIT_INTERVAL_MS = "10"; - - final Map inputProperties = new HashMap<>(MIN_VALID_CONFIGS); - inputProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); - final KsqlRestConfig config = new KsqlRestConfig(inputProperties); + public void shouldReturnDifferentMapOnEachCallToOriginals() { + // Given: + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10") + .build() + ); final Map originals1 = config.getOriginals(); final Map originals2 = config.getOriginals(); - assertEquals(originals1, originals2); - assertNotSame(originals1, originals2); - assertEquals(COMMIT_INTERVAL_MS, originals1.get(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); - assertEquals(COMMIT_INTERVAL_MS, originals2.get(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); + // When: + originals1.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "99"); + + // Then: + assertThat(originals2.get(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), is("10")); + } + + @Test + public void shouldThrowIfAnyListenerIsInvalidUrl() { + // Expect: + expectedException.expect(ConfigException.class); + expectedException.expectMessage("Invalid value INVALID for configuration " + + LISTENERS_CONFIG + + ": Not valid URL: no protocol: INVALID" + ); + + // Given: + new KsqlRestConfig(ImmutableMap.builder() + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + .put(LISTENERS_CONFIG, "http://localhost:9875,INVALID") + .build() + ); + } + + @Test + public void shouldThrowIfExplicitInterNodeListenerIsInvalidUrl() { + // Expect: + expectedException.expect(ConfigException.class); + expectedException.expectMessage("Invalid value INVALID for configuration " + + ADVERTISED_LISTENER_CONFIG + + ": Not valid URL: no protocol: INVALID" + ); + + // Given: + new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(ADVERTISED_LISTENER_CONFIG, "INVALID") + .build() + ); + } + + @Test + public void shouldUseExplicitInterNodeListenerSetToUnresolvableHost() { + // Given: + final URL expected = url("https://unresolvable.host:12345"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(ADVERTISED_LISTENER_CONFIG, expected.toString()) + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_INTER_NODE_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldUseExplicitInterNodeListenerSetToResolvableHost() { + // Given: + final URL expected = url("https://example.com:12345"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(ADVERTISED_LISTENER_CONFIG, expected.toString()) + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_INTER_NODE_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldUseExplicitInterNodeListenerIfSetToLocalHost() { + // Given: + final URL expected = url("https://localHost:52368"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(ADVERTISED_LISTENER_CONFIG, expected.toString()) + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_INTER_NODE_LISTENER_CONFIG); + verifyLogsLoopBackWarning(expected, QUOTED_INTER_NODE_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldUseExplicitInterNodeListenerIfSetToIpv4Loopback() { + // Given: + final URL expected = url("https://127.0.0.2:12345"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(ADVERTISED_LISTENER_CONFIG, expected.toString()) + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_INTER_NODE_LISTENER_CONFIG); + verifyLogsLoopBackWarning(expected, QUOTED_INTER_NODE_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldUseExplicitInterNodeListenerIfSetToIpv6Loopback() { + // Given: + final URL expected = url("https://[::1]:12345"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(ADVERTISED_LISTENER_CONFIG, expected.toString()) + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_INTER_NODE_LISTENER_CONFIG); + verifyLogsLoopBackWarning(expected, QUOTED_INTER_NODE_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldThrowIfExplicitInterNodeListenerHasAutoPortAssignment() { + // Given: + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(ADVERTISED_LISTENER_CONFIG, "https://unresolvable.host:0") + .build() + ); + + // Expect: + expectedException.expect(ConfigException.class); + expectedException.expectMessage("Invalid value https://unresolvable.host:0 for configuration " + + ADVERTISED_LISTENER_CONFIG + + ": Must have valid port" + ); + + + // When: + config.getInterNodeListener(portResolver, logger); + } + + @Test + public void shouldThrowIfExplicitInterNodeListenerHasIpv4WildcardAddress() { + // Given: + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(ADVERTISED_LISTENER_CONFIG, "https://0.0.0.0:12589") + .build() + ); + + // Expect: + expectedException.expect(ConfigException.class); + expectedException.expectMessage("Invalid value https://0.0.0.0:12589 for configuration " + + ADVERTISED_LISTENER_CONFIG + + ": Can not be wildcard" + ); + + // When: + config.getInterNodeListener(portResolver, logger); + } + + @Test + public void shouldThrowIfExplicitInterNodeListenerHasIpv6WildcardAddress() { + // Given: + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(ADVERTISED_LISTENER_CONFIG, "https://[::]:1236") + .build() + ); + + // Expect: + expectedException.expect(ConfigException.class); + expectedException.expectMessage("Invalid value https://[::]:1236 for configuration " + + ADVERTISED_LISTENER_CONFIG + + ": Can not be wildcard" + ); + + // When: + config.getInterNodeListener(portResolver, logger); + } + + @Test + public void shouldThrowIfOnGetInterNodeListenerIfFirstListenerSetToUnresolvableHost() { + // Given: + final URL expected = url("https://unresolvable.host:12345"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + .put(LISTENERS_CONFIG, expected.toString() + ",http://localhost:2589") + .build() + ); + + // Expect: + expectedException.expect(ConfigException.class); + expectedException.expectMessage("Invalid value " + + "[https://unresolvable.host:12345, http://localhost:2589] for configuration " + + LISTENERS_CONFIG + + ": Could not resolve first host" + ); + + // When: + config.getInterNodeListener(portResolver, logger); + } + + @Test + public void shouldResolveInterNodeListenerToFirstListenerSetToResolvableHost() { + // Given: + final URL expected = url("https://example.com:12345"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + .put(LISTENERS_CONFIG, expected.toString() + ",http://localhost:2589") + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldResolveInterNodeListenerToFirstListenerSetToLocalHost() { + // Given: + final URL expected = url("https://localHost:52368"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + .put(LISTENERS_CONFIG, expected.toString() + ",http://localhost:2589") + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyLogsLoopBackWarning(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldResolveInterNodeListenerToFirstListenerSetToIpv4Loopback() { + // Given: + final URL expected = url("https://127.0.0.2:12345"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + .put(LISTENERS_CONFIG, expected.toString() + ",http://localhost:2589") + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyLogsLoopBackWarning(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldResolveInterNodeListenerToFirstListenerSetToIpv6Loopback() { + // Given: + final URL expected = url("https://[::1]:12345"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + .put(LISTENERS_CONFIG, expected.toString() + ",http://localhost:2589") + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyLogsLoopBackWarning(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldResolveInterNodeListenerToFirstListenerWithAutoPortAssignment() { + // Given: + final URL autoPort = url("https://example.com:0"); + + when(portResolver.apply(any())).thenReturn(2222); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + .put(LISTENERS_CONFIG, autoPort.toString() + ",http://localhost:2589") + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + final URL expected = url("https://example.com:2222"); + + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldResolveInterNodeListenerToFirstListenerWithIpv4WildcardAddress() { + // Given: + final URL wildcard = url("https://0.0.0.0:12589"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + .put(LISTENERS_CONFIG, wildcard.toString() + ",http://localhost:2589") + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + final URL expected = url("https://" + getLocalHostName() + ":12589"); + + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyLogsWildcardWarning(expected); + verifyNoMoreInteractions(logger); + } + + @Test + public void shouldResolveInterNodeListenerToFirstListenerWithIpv6WildcardAddress() { + // Given: + final URL wildcard = url("https://[::]:12345"); + + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + .put(LISTENERS_CONFIG, wildcard.toString() + ",http://localhost:2589") + .build() + ); + + // When: + final URL actual = config.getInterNodeListener(portResolver, logger); + + // Then: + final URL expected = url("https://" + getLocalHostName() + ":12345"); + + assertThat(actual, is(expected)); + verifyLogsInterNodeListener(expected, QUOTED_FIRST_LISTENER_CONFIG); + verifyLogsWildcardWarning(expected); + verifyNoMoreInteractions(logger); + } + + private void verifyLogsInterNodeListener(final URL listener, final String sourceConfig) { + verify(logger).info( + "Using {} config for intra-node communication: {}", + sourceConfig, + listener + ); + } + + private void verifyLogsLoopBackWarning(final URL listener, final String sourceConfig) { + verify(logger).warn( + "{} config is set to a loopback address: {}. Intra-node communication will only work " + + "between nodes running on the same machine.", + sourceConfig, + listener + ); + } + + private void verifyLogsWildcardWarning(final URL listener) { + verify(logger).warn( + "{} config uses wildcard address: {}. Intra-node communication will only work " + + "between nodes running on the same machine.", + QUOTED_FIRST_LISTENER_CONFIG, + listener + ); + } + + private static URL url(final String address) { + try { + return new URL(address); + } catch (final MalformedURLException e) { + throw new AssertionError("Invalid URL in test: " + address, e); + } + } + + private static String getLocalHostName() { + try { + return InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + throw new AssertionError("Failed to obtain local host info", e); + } } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index facc873f2875..9876553979c7 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -51,7 +51,6 @@ import io.confluent.ksql.services.FakeKafkaTopicClient; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.services.TestServiceContext; -import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.PersistentQueryMetadata; import java.time.Duration; diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 8f4ee101ae2b..fb4a23ffe9c4 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -15,9 +15,9 @@ package io.confluent.ksql.rest.server.resources.streaming; +import static io.confluent.ksql.rest.Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS; import static io.confluent.ksql.rest.entity.KsqlErrorMessageMatchers.errorCode; import static io.confluent.ksql.rest.entity.KsqlErrorMessageMatchers.errorMessage; -import static io.confluent.ksql.rest.Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS; import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionErrorMessage; import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionStatusCode; import static javax.ws.rs.core.Response.Status.FORBIDDEN; diff --git a/ksql-rest-model/src/test/java/io/confluent/ksql/rest/ErrorsTest.java b/ksql-rest-model/src/test/java/io/confluent/ksql/rest/ErrorsTest.java index f5f5ad0ac077..de70d02321ee 100644 --- a/ksql-rest-model/src/test/java/io/confluent/ksql/rest/ErrorsTest.java +++ b/ksql-rest-model/src/test/java/io/confluent/ksql/rest/ErrorsTest.java @@ -16,7 +16,6 @@ package io.confluent.ksql.rest; import static javax.ws.rs.core.Response.Status.FORBIDDEN; - import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -25,6 +24,7 @@ import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.util.KsqlException; +import javax.ws.rs.core.Response; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.junit.Before; import org.junit.Test; @@ -32,8 +32,6 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import javax.ws.rs.core.Response; - @RunWith(MockitoJUnitRunner.class) public class ErrorsTest {