Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pull queries should work across nodes #4169

Merged
merged 4 commits into from
Dec 20, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/ksql-production-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@
### HTTP ###
# The URL the KSQL server will listen on:
# The default is any IPv4 interface on the machine.
# NOTE: If set to wildcard or loopback set 'inter.node.listener' to enable pull queries across machines
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
listeners=http://0.0.0.0:8088

# Use the 'listeners' line below for any IPv6 interface on the machine.
# listeners=http://[::]:8088

# If running a multi-node cluster across multiple machines and 'listeners' is set to a wildcard or loopback address
# 'inter.node.listener' must be set to the URL other KSQL nodes should use to reach this node.
# inter.node.listener=?

### HTTPS ###
# To switch KSQL over to communicating using HTTPS comment out the 'listeners' line above
# uncomment and complete the properties below.
# See: https://docs.confluent.io/current/ksql/docs/installation/server-config/security.html#configuring-ksql-cli-for-https
#
# listeners=https://0.0.0.0:8088
# inter.node.listener=?
# ssl.keystore.location=?
# ssl.keystore.password=?
# ssl.key.password=?
Expand Down
6 changes: 6 additions & 0 deletions config/ksql-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@
### HTTP ###
# The URL the KSQL server will listen on:
# The default is any IPv4 interface on the machine.
# NOTE: If set to wildcard or loopback set 'inter.node.listener' to enable pull queries across machines
listeners=http://0.0.0.0:8088

# Use the 'listeners' line below for any IPv6 interface on the machine.
# listeners=http://[::]:8088

# If running a multi-node cluster across multiple machines and 'listeners' is set to a wildcard or loopback address
# 'inter.node.listener' must be set to the URL other KSQL nodes should use to reach this node.
# inter.node.listener=?

### HTTPS ###
# To switch KSQL over to communicating using HTTPS comment out the 'listeners' line above
# uncomment and complete the properties below.
# See: https://docs.confluent.io/current/ksql/docs/installation/server-config/security.html#configuring-ksql-cli-for-https
#
# listeners=https://0.0.0.0:8088
# inter.node.listener=?
# ssl.keystore.location=?
# ssl.keystore.password=?
# ssl.key.password=?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static java.util.Objects.requireNonNull;

import java.net.URL;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
Expand Down Expand Up @@ -78,6 +79,19 @@ public static <T extends Enum<T>> Validator enumValues(final Class<T> enumClass)
return ValidCaseInsensitiveString.in(validValues);
}

public static Validator validUrl() {
return (name, val) -> {
if (!(val instanceof String)) {
throw new IllegalArgumentException("validator should only be used with STRING defs");
}
try {
new URL((String)val);
} catch (Exception e) {
throw new ConfigException(name, val, "Not valid URL: " + e.getMessage());
}
};
}

public static final class ValidCaseInsensitiveString implements Validator {

private final List<String> validStrings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,31 @@ public void shouldThrowIfParserThrows() {
validator.ensureValid("propName", "value");
}

@Test
public void shouldThrowOnInvalidURL() {
// Given:
final Validator validator = ConfigValidators.validUrl();

// Then:
expectedException.expect(ConfigException.class);
expectedException.expectMessage(
"Invalid value INVALID for configuration propName: Not valid URL: no protocol: INVALID");

// When:
validator.ensureValid("propName", "INVALID");
}

@Test
public void shouldNotThrowOnValidURL() {
// Given:
final Validator validator = ConfigValidators.validUrl();

// When:
validator.ensureValid("propName", "http://valid:25896/somePath");

// Then: did not throw.
}

private enum TestEnum {
FOO, BAR
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;

import org.apache.avro.Schema;
import org.junit.Before;
import org.junit.Rule;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.rest.server;

import static io.confluent.ksql.rest.server.KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG;
import static io.confluent.rest.RestConfig.LISTENERS_CONFIG;
import static java.util.Objects.requireNonNull;

import com.fasterxml.jackson.jaxrs.base.JsonParseExceptionMapper;
Expand Down Expand Up @@ -77,6 +78,7 @@
import io.confluent.ksql.services.SimpleKsqlClient;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.RetryUtil;
import io.confluent.ksql.util.Version;
import io.confluent.ksql.util.WelcomeMsgUtils;
Expand All @@ -87,6 +89,7 @@
import java.io.Console;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
Expand All @@ -106,6 +109,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.websocket.DeploymentException;
import javax.websocket.server.ServerEndpoint;
import javax.websocket.server.ServerEndpointConfig;
Expand Down Expand Up @@ -213,7 +217,7 @@ public void setupResources(final Configurable<?> config, final KsqlRestConfig ap
}

@Override
public void startAsync() throws Exception {
public void startAsync() {
log.info("KSQL RESTful API listening on {}", StringUtils.join(getListeners(), ", "));
final KsqlConfig ksqlConfigWithPort = buildConfigWithPort();
configurables.forEach(c -> c.configure(ksqlConfigWithPort));
Expand Down Expand Up @@ -323,23 +327,46 @@ public void onShutdown() {
}

List<URL> getListeners() {
return Arrays.stream(server.getConnectors())
.filter(connector -> connector instanceof ServerConnector)
.map(ServerConnector.class::cast)
.map(connector -> {
try {
final String protocol = new HashSet<>(connector.getProtocols())
.stream()
.map(String::toLowerCase)
.anyMatch(s -> s.equals("ssl")) ? "https" : "http";

final int localPort = connector.getLocalPort();

return new URL(protocol, "localhost", localPort, "");
} catch (final Exception e) {
throw new RuntimeException("Malformed listener", e);
}
})
final Function<URL, Set<Integer>> resolvePort = url ->
Arrays.stream(server.getConnectors())
.filter(connector -> connector instanceof ServerConnector)
.map(ServerConnector.class::cast)
.filter(connector -> {
final String connectorProtocol = connector.getProtocols().stream()
.map(String::toLowerCase)
.anyMatch(p -> p.equals("ssl")) ? "https" : "http";

return connectorProtocol.equalsIgnoreCase(url.getProtocol());
})
.map(ServerConnector::getLocalPort)
.collect(Collectors.toSet());

final Function<String, Stream<URL>> resolveUrl = listener -> {
try {
final URL url = new URL(listener);
if (url.getPort() != 0) {
return Stream.of(url);
}

// Need to resolve port using actual listeners:
return resolvePort.apply(url).stream()
.map(port -> {
try {
return new URL(url.getProtocol(), url.getHost(), port, url.getFile());
} catch (MalformedURLException e) {
throw new KsqlServerException("Malformed URL specified in '"
+ LISTENERS_CONFIG + "' config: " + listener, e);
}
});
} catch (MalformedURLException e) {
throw new KsqlServerException("Malformed URL specified in '"
+ LISTENERS_CONFIG + "' config: " + listener, e);
}
};

return restConfig.getList(LISTENERS_CONFIG).stream()
.flatMap(resolveUrl)
.distinct()
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -681,21 +708,31 @@ private static void maybeCreateProcessingLogStream(
*
* @return true server config.
*/
private KsqlConfig buildConfigWithPort() {
@VisibleForTesting
KsqlConfig buildConfigWithPort() {
final Map<String, Object> props = ksqlConfigNoPort.originals();

// Wire up KS IQ endpoint discovery to the FIRST listener:
final URL firstListener = getListeners().get(0);
// Wire up KS IQ so that pull queries work across KSQL nodes:
props.put(
KsqlConfig.KSQL_STREAMS_PREFIX + StreamsConfig.APPLICATION_SERVER_CONFIG,
firstListener.toString()
restConfig.getInterNodeListener(this::resolvePort).toString()
);

log.info("Using first listener URL for intra-node communication: {}", firstListener);

return new KsqlConfig(props);
}

private int resolvePort(final URL listener) {
return getListeners().stream()
.filter(l ->
l.getProtocol().equals(listener.getProtocol())
&& l.getHost().equals(listener.getHost())
)
.map(URL::getPort)
.findFirst()
.orElseThrow(() ->
new IllegalStateException("Failed resolve port for listener: " + listener));
}

private static KsqlRestConfig injectPathsWithoutAuthentication(final KsqlRestConfig restConfig) {
final Set<String> authenticationSkipPaths = new HashSet<>(
restConfig.getList(RestConfig.AUTHENTICATION_SKIP_PATHS)
Expand Down
Loading