-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added self documentation to the KSQL config public config variables. #422
Changes from 6 commits
db38492
17de98e
34f2231
b155a65
94cc2db
02a2f32
b3438c6
5104828
7026f77
ae5689a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,18 +36,14 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { | |
|
||
public static final String SINK_NUMBER_OF_PARTITIONS = "PARTITIONS"; | ||
public static final String SINK_NUMBER_OF_PARTITIONS_PROPERTY = "ksql.sink.partitions"; | ||
public static final String DEFAULT_SINK_NUMBER_OF_PARTITIONS = "ksql.sink.partitions.default"; | ||
|
||
public static final String SINK_NUMBER_OF_REPLICATIONS = "REPLICATIONS"; | ||
public static final String SINK_NUMBER_OF_REPLICATIONS_PROPERTY = "ksql.sink.replications"; | ||
public static final String DEFAULT_SINK_NUMBER_OF_REPLICATIONS = "ksql.sink.replications.default"; | ||
public static final String SINK_NUMBER_OF_REPLICAS = "REPLICAS"; | ||
public static final String SINK_NUMBER_OF_REPLICAS_PROPERTY = "ksql.sink.replicas"; | ||
|
||
public static final String SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION = | ||
"WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION"; | ||
public static final String SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY = | ||
"ksql.sink.window.change.log.additional.retention"; | ||
public static final String DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION = | ||
"ksql.sink.window.change.log.additional.retention.default"; | ||
|
||
public static final String STREAM_INTERNAL_CHANGELOG_TOPIC_SUFFIX = "-changelog"; | ||
|
||
|
@@ -57,111 +53,104 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { | |
|
||
public static final String | ||
KSQL_SERVICE_ID_CONFIG = "ksql.service.id"; | ||
public static final ConfigDef.Type | ||
KSQL_SERVICE_ID_TYPE = ConfigDef.Type.STRING; | ||
public static final String | ||
KSQL_SERVICE_ID_DEFAULT = "ksql_"; | ||
public static final ConfigDef.Importance | ||
KSQL_SERVICE_ID_IMPORTANCE = ConfigDef.Importance.MEDIUM; | ||
public static final String | ||
KSQL_SERVICE_ID_DOC = | ||
"Indicates the ID of the ksql service. It will be used as prefix for all KSQL queries in " | ||
+ "this service."; | ||
|
||
public static final String | ||
KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG = "ksql.persistent.prefix"; | ||
public static final ConfigDef.Type | ||
KSQL_PERSISTENT_QUERY_NAME_PREFIX_TYPE = ConfigDef.Type.STRING; | ||
public static final String | ||
KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT = "query_"; | ||
public static final ConfigDef.Importance | ||
KSQL_PERSISTENT_QUERY_NAME_PREFIX_IMPORTANCE = ConfigDef.Importance.MEDIUM; | ||
public static final String | ||
KSQL_PERSISTENT_QUERY_NAME_PREFIX_DOC = | ||
"Second part of the prefix for persitent queries."; | ||
|
||
public static final String | ||
KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG = "ksql.transient.prefix"; | ||
public static final ConfigDef.Type | ||
KSQL_TRANSIENT_QUERY_NAME_PREFIX_TYPE = ConfigDef.Type.STRING; | ||
public static final String | ||
KSQL_TRANSIENT_QUERY_NAME_PREFIX_DEFAULT = "transient_"; | ||
public static final ConfigDef.Importance | ||
KSQL_TRANSIENT_QUERY_NAME_PREFIX_IMPORTANCE = ConfigDef.Importance.MEDIUM; | ||
public static final String | ||
KSQL_TRANSIENT_QUERY_NAME_PREFIX_DOC = | ||
"Second part of the prefix for transient queries."; | ||
|
||
public static final String | ||
KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG = "ksql.statestore.suffix"; | ||
public static final ConfigDef.Type | ||
KSQL_TABLE_STATESTORE_NAME_SUFFIX_TYPE = ConfigDef.Type.STRING; | ||
public static final String | ||
KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT = "transient_"; | ||
public static final ConfigDef.Importance | ||
KSQL_TABLE_STATESTORE_NAME_SUFFIX_IMPORTANCE = ConfigDef.Importance.MEDIUM; | ||
public static final String | ||
KSQL_TABLE_STATESTORE_NAME_SUFFIX_DOC = | ||
"Suffix for state store names in Tables."; | ||
KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT = "_ksql_statestore"; | ||
|
||
public int defaultSinkNumberOfPartitions = 4; | ||
public short defaultSinkNumberOfReplications = 1; | ||
private static int defaultSinkNumberOfPartitions = 4; | ||
private static short defaultSinkNumberOfReplications = 1; | ||
// TODO: Find out the best default value. | ||
public long defaultSinkWindowChangeLogAdditionalRetention = 1000000; | ||
private static long defaultSinkWindowChangeLogAdditionalRetention = 1000000; | ||
|
||
public String defaultAutoOffsetRestConfig = "latest"; | ||
public long defaultCommitIntervalMsConfig = 2000; | ||
public long defaultCacheMaxBytesBufferingConfig = 10000000; | ||
public int defaultNumberOfStreamsThreads = 4; | ||
private static String defaultAutoOffsetRestConfig = "latest"; | ||
private static long defaultCommitIntervalMsConfig = 2000; | ||
private static long defaultCacheMaxBytesBufferingConfig = 10000000; | ||
private static int defaultNumberOfStreamsThreads = 4; | ||
|
||
Map<String, Object> ksqlConfigProps; | ||
Map<String, Object> ksqlStreamConfigProps; | ||
|
||
private static final ConfigDef CONFIG_DEF = new ConfigDef(); | ||
private static final ConfigDef CONFIG_DEF; | ||
|
||
static { | ||
CONFIG_DEF = new ConfigDef() | ||
.define(KSQL_SERVICE_ID_CONFIG, | ||
ConfigDef.Type.STRING, | ||
KSQL_SERVICE_ID_DEFAULT, | ||
ConfigDef.Importance.MEDIUM, | ||
"Indicates the ID of the ksql service. It will be used as prefix for all KSQL queries in " | ||
+ "this service.") | ||
|
||
.define(KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG, | ||
ConfigDef.Type.STRING, | ||
KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT, | ||
ConfigDef.Importance.MEDIUM, | ||
"Second part of the prefix for persitent queries. For instance if the prefix is transient_" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you mean There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's the second part. The first part come from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. I am still not sure I follow. I think it would make sense for the example to include the full chain, so that it is clearer to the user how it all comes together at each stage. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added more details with an example. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The doc string here as written is:
Did you really mean for the example prefix to be 'transient_query_'? |
||
+ "query_ the query name will be ksql_query_1.") | ||
.define(KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG, | ||
ConfigDef.Type.STRING, | ||
KSQL_TRANSIENT_QUERY_NAME_PREFIX_DEFAULT, | ||
ConfigDef.Importance.MEDIUM, | ||
"Second part of the prefix for transient queries. For instance if the prefix is " | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for updating the example. It makes sense now! |
||
+ "transient_ the query name would be ksql_transient_4120896722607083946_1509389010601") | ||
.define(KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG, | ||
ConfigDef.Type.STRING, | ||
KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT, | ||
ConfigDef.Importance.MEDIUM, | ||
"Suffix for state store names in Tables. For instance if the suffix is _ksql_statestore the state " | ||
+ "store name would be ksql_query_1_ksql_statestore" | ||
+ "_ksql_statestore ") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you add this extra line by mistake? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch for |
||
.define(SINK_NUMBER_OF_PARTITIONS_PROPERTY, | ||
ConfigDef.Type.INT, | ||
defaultSinkNumberOfPartitions, | ||
ConfigDef.Importance.MEDIUM, | ||
"The default number of partitions for the topics created by KSQL.") | ||
.define(SINK_NUMBER_OF_REPLICAS_PROPERTY, | ||
ConfigDef.Type.SHORT, | ||
defaultSinkNumberOfReplications, | ||
ConfigDef.Importance.MEDIUM, | ||
"The default number of replicas for the topics created by KSQL." | ||
) | ||
.define(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY, | ||
ConfigDef.Type.LONG, | ||
defaultSinkWindowChangeLogAdditionalRetention, | ||
ConfigDef.Importance.MEDIUM, | ||
"The default window change log additional retention time." | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You probably want to drop the 'default' here. Also, it may make sense to expand the description. It doesn't make sense as is. Perhaps a sentence like 'The amount of time to retain window change logs'. Actually I am still not sure what that means Also, there should be a time unit in the name of the config and the description. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, added the time unit and more details to the description. |
||
) | ||
; | ||
} | ||
|
||
|
||
public KsqlConfig(Map<?, ?> props) { | ||
super(CONFIG_DEF, props); | ||
|
||
ksqlConfigProps = new HashMap<>(); | ||
ksqlStreamConfigProps = new HashMap<>(); | ||
ksqlConfigProps.put(KSQL_SERVICE_ID_CONFIG, KSQL_SERVICE_ID_DEFAULT); | ||
ksqlConfigProps.put(KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG, KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT); | ||
ksqlConfigProps.put(KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG, KSQL_TRANSIENT_QUERY_NAME_PREFIX_DEFAULT); | ||
ksqlConfigProps.put(KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG, KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT); | ||
|
||
if (props.containsKey(DEFAULT_SINK_NUMBER_OF_PARTITIONS)) { | ||
ksqlConfigProps.put(SINK_NUMBER_OF_PARTITIONS_PROPERTY, | ||
Integer.parseInt(props.get(DEFAULT_SINK_NUMBER_OF_PARTITIONS).toString())); | ||
} else { | ||
ksqlConfigProps.put(SINK_NUMBER_OF_PARTITIONS_PROPERTY, defaultSinkNumberOfPartitions); | ||
} | ||
|
||
if (props.containsKey(DEFAULT_SINK_NUMBER_OF_REPLICATIONS)) { | ||
ksqlConfigProps.put(SINK_NUMBER_OF_REPLICATIONS_PROPERTY, | ||
Short.parseShort(props.get(DEFAULT_SINK_NUMBER_OF_REPLICATIONS).toString())); | ||
} else { | ||
ksqlConfigProps.put(SINK_NUMBER_OF_REPLICATIONS_PROPERTY, defaultSinkNumberOfReplications); | ||
} | ||
|
||
if (props.containsKey(DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION)) { | ||
ksqlConfigProps.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY, | ||
Long.parseLong(props.get(DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION).toString())); | ||
} else { | ||
ksqlConfigProps.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY, | ||
defaultSinkWindowChangeLogAdditionalRetention); | ||
} | ||
ksqlConfigProps.putAll(super.values()); | ||
|
||
ksqlStreamConfigProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, defaultAutoOffsetRestConfig); | ||
ksqlStreamConfigProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, defaultCommitIntervalMsConfig); | ||
ksqlStreamConfigProps.put( | ||
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, defaultCacheMaxBytesBufferingConfig); | ||
ksqlStreamConfigProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, defaultNumberOfStreamsThreads); | ||
|
||
for (Map.Entry<?, ?> entry : props.entrySet()) { | ||
for (Map.Entry<?, ?> entry : originals().entrySet()) { | ||
final String key = entry.getKey().toString(); | ||
if (key.toLowerCase().startsWith(KSQL_CONFIG_PREPERTY_PREFIX)) { | ||
ksqlConfigProps.put(key, entry.getValue()); | ||
} else { | ||
if (!key.toLowerCase().startsWith(KSQL_CONFIG_PREPERTY_PREFIX)) { | ||
ksqlStreamConfigProps.put(key, entry.getValue()); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/** | ||
* Copyright 2017 Confluent Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
**/ | ||
|
||
package io.confluent.ksql.util; | ||
|
||
import org.apache.kafka.streams.StreamsConfig; | ||
import org.junit.Test; | ||
import static org.hamcrest.CoreMatchers.equalTo; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
public class KsqlConfigTest { | ||
|
||
@Test | ||
public void shouldSetInitialValuesCorrectly() { | ||
Map<String, Object> initialProps = new HashMap<>(); | ||
initialProps.put(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, 10); | ||
initialProps.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, (short) 3); | ||
initialProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 800); | ||
initialProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5); | ||
|
||
KsqlConfig ksqlConfig = new KsqlConfig(initialProps); | ||
|
||
assertThat(ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY), equalTo(10)); | ||
assertThat(ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY), equalTo((short) 3)); | ||
|
||
} | ||
|
||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name and the value of this property don't seem to line up. What does the
REPLICAS
value mean exactly? Would it be more appropriate to have a numeric value? This applies to theSINK_NUMBER_OF_PARTITIONS
property above as well. Also where is thisSINK_NUMBER_OF_REPLICAS
string used?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are not part of the config variables that can be set by the user via the KsqlConfig but they will be set in the query to specify the number of partitions, replications and the timestamp column for the result stream/table. This is just a constant to represent the name of the attributes.
Here is an example:
CREATE STREAM enrichedpv1 with (timestamp='pageid', partitions = 3) AS SELECT users.userid AS userid, pageid as pageid, region, gender FROM pageview LEFT JOIN users ON pageview.userid = users.userid;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. Then why is it in
KsqlConfig
? There is also aSINK_NUMBER_OF_REPLICAS_PROPERTY
which I assume does go in theConfigDef
? It would be better to have the literals that are in the query moved to a separate file.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, I'll move them to a new file, KsqlConstants.