-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: wire in the KS config needed for point queries (MINOR) #3251
Merged
big-andy-coates
merged 5 commits into
confluentinc:master
from
big-andy-coates:wire_app_server
Aug 28, 2019
Merged
Changes from 3 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
deea16f
feat: wire in the KS config needed for point queries (MINOR)
big-andy-coates b94aa10
Merge branch 'master' into wire_app_server
big-andy-coates adf7a23
Merge branch 'master' into wire_app_server
big-andy-coates 23a6bfc
chore: merge branch 'master' into wire_app_server
big-andy-coates 09957f7
chore: almog's requested changes
big-andy-coates File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
package io.confluent.ksql.rest.server; | ||
|
||
import static io.confluent.ksql.rest.server.KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG; | ||
import static java.util.Objects.requireNonNull; | ||
|
||
import com.fasterxml.jackson.jaxrs.base.JsonParseExceptionMapper; | ||
import com.google.common.annotations.VisibleForTesting; | ||
|
@@ -46,6 +47,7 @@ | |
import io.confluent.ksql.rest.server.computation.StatementExecutor; | ||
import io.confluent.ksql.rest.server.context.KsqlRestServiceContextBinder; | ||
import io.confluent.ksql.rest.server.filters.KsqlAuthorizationFilter; | ||
import io.confluent.ksql.rest.server.resources.KsqlConfigurable; | ||
import io.confluent.ksql.rest.server.resources.KsqlExceptionMapper; | ||
import io.confluent.ksql.rest.server.resources.KsqlResource; | ||
import io.confluent.ksql.rest.server.resources.RootDocument; | ||
|
@@ -64,7 +66,6 @@ | |
import io.confluent.ksql.services.ServiceContext; | ||
import io.confluent.ksql.services.ServiceContextFactory; | ||
import io.confluent.ksql.statement.ConfiguredStatement; | ||
import io.confluent.ksql.statement.Injectors; | ||
import io.confluent.ksql.util.KsqlConfig; | ||
import io.confluent.ksql.util.KsqlException; | ||
import io.confluent.ksql.util.RetryUtil; | ||
|
@@ -86,7 +87,6 @@ | |
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
import java.util.Properties; | ||
import java.util.concurrent.Executors; | ||
|
@@ -100,6 +100,7 @@ | |
import javax.websocket.server.ServerEndpointConfig; | ||
import javax.websocket.server.ServerEndpointConfig.Configurator; | ||
import javax.ws.rs.core.Configurable; | ||
import org.apache.kafka.streams.StreamsConfig; | ||
import org.eclipse.jetty.server.ServerConnector; | ||
import org.eclipse.jetty.websocket.jsr356.server.ServerContainer; | ||
import org.glassfish.hk2.utilities.Binder; | ||
|
@@ -115,7 +116,7 @@ public final class KsqlRestApplication extends Application<KsqlRestConfig> imple | |
|
||
public static final String COMMANDS_STREAM_NAME = "KSQL_COMMANDS"; | ||
|
||
private final KsqlConfig ksqlConfig; | ||
private final KsqlConfig ksqlConfigNoPort; | ||
private final KsqlEngine ksqlEngine; | ||
private final CommandRunner commandRunner; | ||
private final CommandStore commandStore; | ||
|
@@ -131,6 +132,7 @@ public final class KsqlRestApplication extends Application<KsqlRestConfig> imple | |
private final ProcessingLogContext processingLogContext; | ||
private final List<KsqlServerPrecondition> preconditions; | ||
private final KsqlConnect ksqlConnect; | ||
private final List<KsqlConfigurable> configurables; | ||
|
||
public static String getCommandsStreamName() { | ||
return COMMANDS_STREAM_NAME; | ||
|
@@ -155,39 +157,34 @@ public static String getCommandsStreamName() { | |
final ServerState serverState, | ||
final ProcessingLogContext processingLogContext, | ||
final List<KsqlServerPrecondition> preconditions, | ||
final KsqlConnect ksqlConnect | ||
final KsqlConnect ksqlConnect, | ||
final List<KsqlConfigurable> configurables | ||
) { | ||
super(config); | ||
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); | ||
this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); | ||
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); | ||
this.commandRunner = Objects.requireNonNull(commandRunner, "commandRunner"); | ||
this.rootDocument = Objects.requireNonNull(rootDocument, "rootDocument"); | ||
this.statusResource = Objects.requireNonNull(statusResource, "statusResource"); | ||
this.streamedQueryResource = | ||
Objects.requireNonNull(streamedQueryResource, "streamedQueryResource"); | ||
this.ksqlResource = Objects.requireNonNull(ksqlResource, "ksqlResource"); | ||
this.commandStore = Objects.requireNonNull(commandStore, "commandStore"); | ||
this.serverState = Objects.requireNonNull(serverState, "serverState"); | ||
this.processingLogContext = Objects.requireNonNull( | ||
processingLogContext, | ||
"processingLogContext"); | ||
this.preconditions = Objects.requireNonNull(preconditions, "preconditions"); | ||
this.versionCheckerAgent = | ||
Objects.requireNonNull(versionCheckerAgent, "versionCheckerAgent"); | ||
this.serviceContextBinderFactory = Objects.requireNonNull( | ||
serviceContextBinderFactory, "serviceContextBinderFactory"); | ||
this.securityExtension = Objects.requireNonNull( | ||
securityExtension, "securityExtension" | ||
); | ||
this.ksqlConnect = Objects | ||
.requireNonNull(ksqlConnect, "ksqlConnect"); | ||
this.serviceContext = requireNonNull(serviceContext, "serviceContext"); | ||
this.ksqlConfigNoPort = requireNonNull(ksqlConfig, "ksqlConfig"); | ||
this.ksqlEngine = requireNonNull(ksqlEngine, "ksqlEngine"); | ||
this.commandRunner = requireNonNull(commandRunner, "commandRunner"); | ||
this.rootDocument = requireNonNull(rootDocument, "rootDocument"); | ||
this.statusResource = requireNonNull(statusResource, "statusResource"); | ||
this.streamedQueryResource = requireNonNull(streamedQueryResource, "streamedQueryResource"); | ||
this.ksqlResource = requireNonNull(ksqlResource, "ksqlResource"); | ||
this.commandStore = requireNonNull(commandStore, "commandStore"); | ||
this.serverState = requireNonNull(serverState, "serverState"); | ||
this.processingLogContext = requireNonNull(processingLogContext, "processingLogContext"); | ||
this.preconditions = requireNonNull(preconditions, "preconditions"); | ||
this.versionCheckerAgent = requireNonNull(versionCheckerAgent, "versionCheckerAgent"); | ||
this.serviceContextBinderFactory = | ||
requireNonNull(serviceContextBinderFactory, "serviceContextBinderFactory"); | ||
this.securityExtension = requireNonNull(securityExtension, "securityExtension"); | ||
this.ksqlConnect = requireNonNull(ksqlConnect, "ksqlConnect"); | ||
this.configurables = requireNonNull(configurables, "configurables"); | ||
} | ||
|
||
@Override | ||
public void setupResources(final Configurable<?> config, final KsqlRestConfig appConfig) { | ||
config.register(rootDocument); | ||
config.register(new ServerInfoResource(serviceContext, ksqlConfig)); | ||
config.register(new ServerInfoResource(serviceContext, ksqlConfigNoPort)); | ||
config.register(statusResource); | ||
config.register(ksqlResource); | ||
config.register(streamedQueryResource); | ||
|
@@ -198,6 +195,8 @@ public void setupResources(final Configurable<?> config, final KsqlRestConfig ap | |
@Override | ||
public void start() throws Exception { | ||
super.start(); | ||
final KsqlConfig ksqlConfigWithPort = buildConfigWithPort(); | ||
configurables.forEach(c -> c.configure(ksqlConfigWithPort)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Once the Jetty server has been started we know the actual ports, so can now build more complete config and configure those components that are lazily configured. |
||
startKsql(); | ||
commandRunner.start(); | ||
ksqlConnect.startAsync(); | ||
|
@@ -256,11 +255,11 @@ private void initialize() { | |
ProcessingLogServerUtils.maybeCreateProcessingLogTopic( | ||
serviceContext.getTopicClient(), | ||
processingLogContext.getConfig(), | ||
ksqlConfig | ||
ksqlConfigNoPort | ||
); | ||
maybeCreateProcessingLogStream( | ||
processingLogContext.getConfig(), | ||
ksqlConfig, | ||
ksqlConfigNoPort, | ||
ksqlEngine, | ||
commandStore | ||
); | ||
|
@@ -341,7 +340,7 @@ public void configureBaseApplication( | |
new JacksonMessageBodyProvider(JsonMapper.INSTANCE.mapper); | ||
config.register(jsonProvider); | ||
config.register(JsonParseExceptionMapper.class); | ||
config.register(serviceContextBinderFactory.apply(ksqlConfig, securityExtension)); | ||
config.register(serviceContextBinderFactory.apply(ksqlConfigNoPort, securityExtension)); | ||
|
||
// Don't want to buffer rows when streaming JSON in a request to the query resource | ||
config.property(ServerProperties.OUTBOUND_CONTENT_LENGTH_BUFFER, 0); | ||
|
@@ -369,7 +368,7 @@ protected void registerWebSocketEndpoints(final ServerContainer container) { | |
|
||
final StatementParser statementParser = new StatementParser(ksqlEngine); | ||
final TopicAccessValidator topicAccessValidator = | ||
TopicAccessValidatorFactory.create(ksqlConfig, serviceContext); | ||
TopicAccessValidatorFactory.create(ksqlConfigNoPort, serviceContext); | ||
|
||
container.addEndpoint( | ||
ServerEndpointConfig.Builder | ||
|
@@ -382,7 +381,7 @@ protected void registerWebSocketEndpoints(final ServerContainer container) { | |
@SuppressWarnings("unchecked") | ||
public <T> T getEndpointInstance(final Class<T> endpointClass) { | ||
return (T) new WSQueryEndpoint( | ||
ksqlConfig, | ||
buildConfigWithPort(), | ||
JsonMapper.INSTANCE.mapper, | ||
statementParser, | ||
ksqlEngine, | ||
|
@@ -452,18 +451,12 @@ static KsqlRestApplication buildApplication( | |
final String commandTopic = KsqlInternalTopicUtils.getTopicName( | ||
ksqlConfig, KsqlRestConfig.COMMAND_TOPIC_SUFFIX); | ||
|
||
final StatementParser statementParser = new StatementParser(ksqlEngine); | ||
|
||
final CommandStore commandStore = CommandStore.Factory.create( | ||
commandTopic, | ||
restConfig.getCommandConsumerProperties(), | ||
restConfig.getCommandProducerProperties()); | ||
|
||
final StatementExecutor statementExecutor = new StatementExecutor( | ||
ksqlConfig, | ||
ksqlEngine, | ||
statementParser | ||
); | ||
final StatementExecutor statementExecutor = new StatementExecutor(ksqlEngine); | ||
|
||
final RootDocument rootDocument = new RootDocument(); | ||
|
||
|
@@ -479,9 +472,7 @@ static KsqlRestApplication buildApplication( | |
TopicAccessValidatorFactory.create(ksqlConfig, serviceContext); | ||
|
||
final StreamedQueryResource streamedQueryResource = new StreamedQueryResource( | ||
ksqlConfig, | ||
ksqlEngine, | ||
statementParser, | ||
commandStore, | ||
Duration.ofMillis( | ||
restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG)), | ||
|
@@ -491,13 +482,12 @@ static KsqlRestApplication buildApplication( | |
); | ||
|
||
final KsqlResource ksqlResource = new KsqlResource( | ||
ksqlConfig, | ||
ksqlEngine, | ||
commandStore, | ||
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)), | ||
versionChecker::updateLastRequestTime, | ||
Injectors.DEFAULT, | ||
topicAccessValidator); | ||
topicAccessValidator | ||
); | ||
|
||
final List<String> managedTopics = new LinkedList<>(); | ||
managedTopics.add(commandTopic); | ||
|
@@ -508,7 +498,7 @@ static KsqlRestApplication buildApplication( | |
statementExecutor, | ||
commandStore, | ||
maxStatementRetries, | ||
new ClusterTerminator(ksqlConfig, ksqlEngine, serviceContext, managedTopics), | ||
new ClusterTerminator(ksqlEngine, serviceContext, managedTopics), | ||
serverState | ||
); | ||
|
||
|
@@ -526,6 +516,12 @@ static KsqlRestApplication buildApplication( | |
) | ||
); | ||
|
||
final List<KsqlConfigurable> configurables = ImmutableList.of( | ||
ksqlResource, | ||
streamedQueryResource, | ||
statementExecutor | ||
); | ||
|
||
return new KsqlRestApplication( | ||
serviceContext, | ||
ksqlEngine, | ||
|
@@ -543,23 +539,28 @@ static KsqlRestApplication buildApplication( | |
serverState, | ||
processingLogContext, | ||
preconditions, | ||
ksqlConnect | ||
ksqlConnect, | ||
configurables | ||
); | ||
} | ||
|
||
private void registerCommandTopic() { | ||
|
||
final String commandTopic = commandStore.getCommandTopicName(); | ||
|
||
KsqlInternalTopicUtils.ensureTopic(commandTopic, ksqlConfig, serviceContext.getTopicClient()); | ||
KsqlInternalTopicUtils.ensureTopic( | ||
commandTopic, | ||
ksqlConfigNoPort, | ||
serviceContext.getTopicClient() | ||
); | ||
|
||
final String createCmd = "CREATE STREAM " + COMMANDS_STREAM_NAME | ||
+ " (STATEMENT STRING)" | ||
+ " WITH(VALUE_FORMAT='JSON', KAFKA_TOPIC='" + commandTopic + "');"; | ||
|
||
final ParsedStatement parsed = ksqlEngine.parse(createCmd).get(0); | ||
final PreparedStatement<?> prepared = ksqlEngine.prepare(parsed); | ||
ksqlEngine.execute(ConfiguredStatement.of(prepared, ImmutableMap.of(), ksqlConfig)); | ||
ksqlEngine.execute(ConfiguredStatement.of(prepared, ImmutableMap.of(), ksqlConfigNoPort)); | ||
} | ||
|
||
private static KsqlSecurityExtension loadSecurityExtension(final KsqlConfig ksqlConfig) { | ||
|
@@ -624,4 +625,22 @@ private static void maybeCreateProcessingLogStream( | |
|
||
commandQueue.enqueueCommand(configured.get()); | ||
} | ||
|
||
/** | ||
* Build a complete config with the KS IQ application.server set. | ||
* | ||
* @return true server config. | ||
*/ | ||
private KsqlConfig buildConfigWithPort() { | ||
final Map<String, Object> props = ksqlConfigNoPort.originals(); | ||
|
||
// Wire up KS IQ endpoint discovery to the FIRST listener: | ||
final URL firstListener = getListeners().get(0); | ||
big-andy-coates marked this conversation as resolved.
Show resolved
Hide resolved
|
||
props.put( | ||
KsqlConfig.KSQL_STREAMS_PREFIX + StreamsConfig.APPLICATION_SERVER_CONFIG, | ||
firstListener.toString() | ||
); | ||
|
||
return new KsqlConfig(props); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've renamed this to make it clear it does not contain the port info. My thinking being that people may think twice before using it blindly.