-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: wire in the KS config needed for point queries (MINOR) #3251
feat: wire in the KS config needed for point queries (MINOR) #3251
Conversation
Point queries need to be able to determine with KSQL node stores the data for a specific key in a state store. Kafka Streams supports this via its interactive queries functionality. For this to work, the KS `application.server` config must be set. This change sees the `application.server` being automatically set by the rest application to the FIRST listener supplied in the standard `listener` config. The `listener` can use automatic port allocation, e.g. if set to `http://localhost:0` then it will auto-allocate a port number. This means the port is not known until the server has been started, which requires the `KsqlConfig` to be injected into certain types once the server is started.
@@ -115,7 +116,7 @@ | |||
|
|||
public static final String COMMANDS_STREAM_NAME = "KSQL_COMMANDS"; | |||
|
|||
private final KsqlConfig ksqlConfig; | |||
private final KsqlConfig ksqlConfigNoPort; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've renamed this to make it clear it does not contain the port info. My thinking being that people may think twice before using it blindly.
@@ -198,6 +195,8 @@ public void setupResources(final Configurable<?> config, final KsqlRestConfig ap | |||
@Override | |||
public void start() throws Exception { | |||
super.start(); | |||
final KsqlConfig ksqlConfigWithPort = buildConfigWithPort(); | |||
configurables.forEach(c -> c.configure(ksqlConfigWithPort)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once the Jetty server has been started we know the actual ports, so can now build more complete config and configure those components that are lazily configured.
@@ -109,7 +109,6 @@ public void setup() { | |||
when(serviceContext.getTopicClient()).thenReturn(kafkaTopicClient); | |||
when(serviceContext.getSchemaRegistryClient()).thenReturn(schemaRegistryClient); | |||
clusterTerminator = new ClusterTerminator( | |||
ksqlConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed, as its not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java
Show resolved
Hide resolved
# Conflicts: # ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java # ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java # ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java
Description
Point queries need to be able to determine with KSQL node stores the data for a specific key in a state store. Kafka Streams supports this via its interactive queries functionality. For this to work, the KS
application.server
config must be set.This change sees the
application.server
being automatically set by the rest application to the FIRST listener supplied in the standardlistener
config.As the
listener
setting can use automatic port allocation, e.g. if set tohttp://localhost:0
then it will auto-allocate a port number, we might not know the actual port until the Jetty server is started.This causes problems where components require the
KsqlConfig
to be injected at creation time. To get around this I've addedKsqlConfigurable
, which components can implement, meaning they get passed their config after creation, and that config includes the required app server setting with the correct port.The following components currently implement
KsqlConfigurable
:KsqlResource
StreamedQueryResource
StatementExecutor
Strictly speaking, only the last of these currently needs the app server set, as it is the only one the currently starts persistent queries. But I think it's safer, i.e. less likely to cause bugs in the future, if all three are configured with the app server set.
Testing done
Some tests added in this PR. More coming with later point query PRs.
Reviewer checklist