Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: wire in the KS config needed for point queries (MINOR) #3251

Merged
merged 5 commits into from
Aug 28, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.rest.server;

import static io.confluent.ksql.rest.server.KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG;
import static java.util.Objects.requireNonNull;

import com.fasterxml.jackson.jaxrs.base.JsonParseExceptionMapper;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -46,6 +47,7 @@
import io.confluent.ksql.rest.server.computation.StatementExecutor;
import io.confluent.ksql.rest.server.context.KsqlRestServiceContextBinder;
import io.confluent.ksql.rest.server.filters.KsqlAuthorizationFilter;
import io.confluent.ksql.rest.server.resources.KsqlConfigurable;
import io.confluent.ksql.rest.server.resources.KsqlExceptionMapper;
import io.confluent.ksql.rest.server.resources.KsqlResource;
import io.confluent.ksql.rest.server.resources.RootDocument;
Expand All @@ -64,7 +66,6 @@
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.services.ServiceContextFactory;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.Injectors;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.RetryUtil;
Expand All @@ -86,7 +87,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Executors;
Expand All @@ -100,6 +100,7 @@
import javax.websocket.server.ServerEndpointConfig;
import javax.websocket.server.ServerEndpointConfig.Configurator;
import javax.ws.rs.core.Configurable;
import org.apache.kafka.streams.StreamsConfig;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.websocket.jsr356.server.ServerContainer;
import org.glassfish.hk2.utilities.Binder;
Expand All @@ -115,7 +116,7 @@ public final class KsqlRestApplication extends Application<KsqlRestConfig> imple

public static final String COMMANDS_STREAM_NAME = "KSQL_COMMANDS";

private final KsqlConfig ksqlConfig;
private final KsqlConfig ksqlConfigNoPort;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed this to make it clear it does not contain the port info. My thinking being that people may think twice before using it blindly.

private final KsqlEngine ksqlEngine;
private final CommandRunner commandRunner;
private final CommandStore commandStore;
Expand All @@ -131,6 +132,7 @@ public final class KsqlRestApplication extends Application<KsqlRestConfig> imple
private final ProcessingLogContext processingLogContext;
private final List<KsqlServerPrecondition> preconditions;
private final KsqlConnect ksqlConnect;
private final List<KsqlConfigurable> configurables;

public static String getCommandsStreamName() {
return COMMANDS_STREAM_NAME;
Expand All @@ -155,39 +157,34 @@ public static String getCommandsStreamName() {
final ServerState serverState,
final ProcessingLogContext processingLogContext,
final List<KsqlServerPrecondition> preconditions,
final KsqlConnect ksqlConnect
final KsqlConnect ksqlConnect,
final List<KsqlConfigurable> configurables
) {
super(config);
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext");
this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig");
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine");
this.commandRunner = Objects.requireNonNull(commandRunner, "commandRunner");
this.rootDocument = Objects.requireNonNull(rootDocument, "rootDocument");
this.statusResource = Objects.requireNonNull(statusResource, "statusResource");
this.streamedQueryResource =
Objects.requireNonNull(streamedQueryResource, "streamedQueryResource");
this.ksqlResource = Objects.requireNonNull(ksqlResource, "ksqlResource");
this.commandStore = Objects.requireNonNull(commandStore, "commandStore");
this.serverState = Objects.requireNonNull(serverState, "serverState");
this.processingLogContext = Objects.requireNonNull(
processingLogContext,
"processingLogContext");
this.preconditions = Objects.requireNonNull(preconditions, "preconditions");
this.versionCheckerAgent =
Objects.requireNonNull(versionCheckerAgent, "versionCheckerAgent");
this.serviceContextBinderFactory = Objects.requireNonNull(
serviceContextBinderFactory, "serviceContextBinderFactory");
this.securityExtension = Objects.requireNonNull(
securityExtension, "securityExtension"
);
this.ksqlConnect = Objects
.requireNonNull(ksqlConnect, "ksqlConnect");
this.serviceContext = requireNonNull(serviceContext, "serviceContext");
this.ksqlConfigNoPort = requireNonNull(ksqlConfig, "ksqlConfig");
this.ksqlEngine = requireNonNull(ksqlEngine, "ksqlEngine");
this.commandRunner = requireNonNull(commandRunner, "commandRunner");
this.rootDocument = requireNonNull(rootDocument, "rootDocument");
this.statusResource = requireNonNull(statusResource, "statusResource");
this.streamedQueryResource = requireNonNull(streamedQueryResource, "streamedQueryResource");
this.ksqlResource = requireNonNull(ksqlResource, "ksqlResource");
this.commandStore = requireNonNull(commandStore, "commandStore");
this.serverState = requireNonNull(serverState, "serverState");
this.processingLogContext = requireNonNull(processingLogContext, "processingLogContext");
this.preconditions = requireNonNull(preconditions, "preconditions");
this.versionCheckerAgent = requireNonNull(versionCheckerAgent, "versionCheckerAgent");
this.serviceContextBinderFactory =
requireNonNull(serviceContextBinderFactory, "serviceContextBinderFactory");
this.securityExtension = requireNonNull(securityExtension, "securityExtension");
this.ksqlConnect = requireNonNull(ksqlConnect, "ksqlConnect");
this.configurables = requireNonNull(configurables, "configurables");
}

@Override
public void setupResources(final Configurable<?> config, final KsqlRestConfig appConfig) {
config.register(rootDocument);
config.register(new ServerInfoResource(serviceContext, ksqlConfig));
config.register(new ServerInfoResource(serviceContext, ksqlConfigNoPort));
config.register(statusResource);
config.register(ksqlResource);
config.register(streamedQueryResource);
Expand All @@ -198,6 +195,8 @@ public void setupResources(final Configurable<?> config, final KsqlRestConfig ap
@Override
public void start() throws Exception {
super.start();
final KsqlConfig ksqlConfigWithPort = buildConfigWithPort();
configurables.forEach(c -> c.configure(ksqlConfigWithPort));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the Jetty server has been started we know the actual ports, so can now build more complete config and configure those components that are lazily configured.

startKsql();
commandRunner.start();
ksqlConnect.startAsync();
Expand Down Expand Up @@ -256,11 +255,11 @@ private void initialize() {
ProcessingLogServerUtils.maybeCreateProcessingLogTopic(
serviceContext.getTopicClient(),
processingLogContext.getConfig(),
ksqlConfig
ksqlConfigNoPort
);
maybeCreateProcessingLogStream(
processingLogContext.getConfig(),
ksqlConfig,
ksqlConfigNoPort,
ksqlEngine,
commandStore
);
Expand Down Expand Up @@ -341,7 +340,7 @@ public void configureBaseApplication(
new JacksonMessageBodyProvider(JsonMapper.INSTANCE.mapper);
config.register(jsonProvider);
config.register(JsonParseExceptionMapper.class);
config.register(serviceContextBinderFactory.apply(ksqlConfig, securityExtension));
config.register(serviceContextBinderFactory.apply(ksqlConfigNoPort, securityExtension));

// Don't want to buffer rows when streaming JSON in a request to the query resource
config.property(ServerProperties.OUTBOUND_CONTENT_LENGTH_BUFFER, 0);
Expand All @@ -368,7 +367,7 @@ protected void registerWebSocketEndpoints(final ServerContainer container) {

final StatementParser statementParser = new StatementParser(ksqlEngine);
final TopicAccessValidator topicAccessValidator =
TopicAccessValidatorFactory.create(ksqlConfig, serviceContext);
TopicAccessValidatorFactory.create(ksqlConfigNoPort, serviceContext);

container.addEndpoint(
ServerEndpointConfig.Builder
Expand All @@ -381,7 +380,7 @@ protected void registerWebSocketEndpoints(final ServerContainer container) {
@SuppressWarnings("unchecked")
public <T> T getEndpointInstance(final Class<T> endpointClass) {
return (T) new WSQueryEndpoint(
ksqlConfig,
buildConfigWithPort(),
JsonMapper.INSTANCE.mapper,
statementParser,
ksqlEngine,
Expand Down Expand Up @@ -449,18 +448,12 @@ static KsqlRestApplication buildApplication(
final String commandTopic = KsqlInternalTopicUtils.getTopicName(
ksqlConfig, KsqlRestConfig.COMMAND_TOPIC_SUFFIX);

final StatementParser statementParser = new StatementParser(ksqlEngine);

final CommandStore commandStore = CommandStore.Factory.create(
commandTopic,
restConfig.getCommandConsumerProperties(),
restConfig.getCommandProducerProperties());

final StatementExecutor statementExecutor = new StatementExecutor(
ksqlConfig,
ksqlEngine,
statementParser
);
final StatementExecutor statementExecutor = new StatementExecutor(ksqlEngine);

final RootDocument rootDocument = new RootDocument();

Expand All @@ -476,9 +469,7 @@ static KsqlRestApplication buildApplication(
TopicAccessValidatorFactory.create(ksqlConfig, serviceContext);

final StreamedQueryResource streamedQueryResource = new StreamedQueryResource(
ksqlConfig,
ksqlEngine,
statementParser,
commandStore,
Duration.ofMillis(
restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG)),
Expand All @@ -488,13 +479,12 @@ static KsqlRestApplication buildApplication(
);

final KsqlResource ksqlResource = new KsqlResource(
ksqlConfig,
ksqlEngine,
commandStore,
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
versionChecker::updateLastRequestTime,
Injectors.DEFAULT,
topicAccessValidator);
topicAccessValidator
);

final List<String> managedTopics = new LinkedList<>();
managedTopics.add(commandTopic);
Expand All @@ -505,7 +495,7 @@ static KsqlRestApplication buildApplication(
statementExecutor,
commandStore,
maxStatementRetries,
new ClusterTerminator(ksqlConfig, ksqlEngine, serviceContext, managedTopics),
new ClusterTerminator(ksqlEngine, serviceContext, managedTopics),
serverState
);

Expand All @@ -523,6 +513,12 @@ static KsqlRestApplication buildApplication(
)
);

final List<KsqlConfigurable> configurables = ImmutableList.of(
ksqlResource,
streamedQueryResource,
statementExecutor
);

return new KsqlRestApplication(
serviceContext,
ksqlEngine,
Expand All @@ -540,23 +536,28 @@ static KsqlRestApplication buildApplication(
serverState,
processingLogContext,
preconditions,
ksqlConnect
ksqlConnect,
configurables
);
}

private void registerCommandTopic() {

final String commandTopic = commandStore.getCommandTopicName();

KsqlInternalTopicUtils.ensureTopic(commandTopic, ksqlConfig, serviceContext.getTopicClient());
KsqlInternalTopicUtils.ensureTopic(
commandTopic,
ksqlConfigNoPort,
serviceContext.getTopicClient()
);

final String createCmd = "CREATE STREAM " + COMMANDS_STREAM_NAME
+ " (STATEMENT STRING)"
+ " WITH(VALUE_FORMAT='JSON', KAFKA_TOPIC='" + commandTopic + "');";

final ParsedStatement parsed = ksqlEngine.parse(createCmd).get(0);
final PreparedStatement<?> prepared = ksqlEngine.prepare(parsed);
ksqlEngine.execute(ConfiguredStatement.of(prepared, ImmutableMap.of(), ksqlConfig));
ksqlEngine.execute(ConfiguredStatement.of(prepared, ImmutableMap.of(), ksqlConfigNoPort));
}

private static KsqlSecurityExtension loadSecurityExtension(final KsqlConfig ksqlConfig) {
Expand Down Expand Up @@ -621,4 +622,22 @@ static void maybeCreateProcessingLogStream(

commandQueue.enqueueCommand(configured.get());
}

/**
* Build a complete config with the KS IQ application.server set.
*
* @return true server config.
*/
private KsqlConfig buildConfigWithPort() {
final Map<String, Object> props = ksqlConfigNoPort.originals();

// Wire up KS IQ endpoint discovery to the FIRST listener:
final URL firstListener = getListeners().get(0);
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
props.put(
KsqlConfig.KSQL_STREAMS_PREFIX + StreamsConfig.APPLICATION_SERVER_CONFIG,
firstListener.toString()
);

return new KsqlConfig(props);
}
}
Loading