Skip to content

Commit

Permalink
feat: add feature flag to disable source table materialization (#8085)
Browse files Browse the repository at this point in the history
  • Loading branch information
spena authored Sep 1, 2021
1 parent a0be1d5 commit 5416cde
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public final class ImmutableProperties {
.add(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG)
.add(KsqlConfig.KSQL_HIDDEN_TOPICS_CONFIG)
.add(KsqlConfig.KSQL_READONLY_TOPICS_CONFIG)
.add(KsqlConfig.KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED)
.addAll(KsqlConfig.SSL_CONFIG_NAMES)
.build();

Expand Down
17 changes: 17 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,16 @@ public class KsqlConfig extends AbstractConfig {
"Feature flag for ROWPARTITION and ROWOFFSET pseudocolumns. If enabled, new queries will be"
+ "built with ROWPARTITION and ROWOFFSET pseudocolumns. If off, they will not be.";

public static final String KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED =
"ksql.source.table.materialization.enabled";
private static final Boolean KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED_DEFAULT = true;
private static final String KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED_DOC =
"Feature flag that enables table materialization on source tables. Default is true. "
+ "If false, CREATE SOURCE [TABLE|STREAM] statements will be rejected. "
+ "Current CREATE SOURCE TABLE statements found in the KSQL command topic will "
+ "not be materialized and pull queries won't be allowed on them. However, current "
+ "CREATE SOURCE [TABLE|STREAM] statements will continue being read-only.";

public static final String KSQL_SHARED_RUNTIME_ENABLED = "ksql.runtime.feature.shared.enabled";
public static final Boolean KSQL_SHARED_RUNTIME_ENABLED_DEFAULT = false;
public static final String KSQL_SHARED_RUNTIME_ENABLED_DOC =
Expand Down Expand Up @@ -1061,6 +1071,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.MEDIUM,
KSQL_SHARED_RUNTIME_ENABLED_DOC
)
.define(
KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED,
Type.BOOLEAN,
KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED_DEFAULT,
Importance.LOW,
KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED_DOC
)
.withClientSslSupport();

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.confluent.ksql.parser.OutputRefinement;
import io.confluent.ksql.parser.tree.AliasedRelation;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.CreateTableAsSelect;
Expand Down Expand Up @@ -186,6 +187,18 @@ ExecuteResult execute(final KsqlPlan plan) {
return ExecuteResult.of(ddlResult.get());
}

// Do not execute the plan (found on new CST commands or commands read from the command topic)
// for source tables if the feature is disabled. CST will still be read-only, but no query
// must be executed.
if (persistentQueryType == KsqlConstants.PersistentQueryType.CREATE_SOURCE
&& !isSourceTableMaterializationEnabled()) {
LOG.info(String.format(
"Source table query '%s' won't be materialized because '%s' is disabled.",
plan.getStatementText(),
KsqlConfig.KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED));
return ExecuteResult.of(ddlResult.get());
}

return ExecuteResult.of(executePersistentQuery(
queryPlan,
plan.getStatementText(),
Expand Down Expand Up @@ -473,15 +486,38 @@ private KsqlPlan sourceTablePlan(
queryPlan);
}

private boolean isSourceStreamOrTable(final ConfiguredStatement<?> statement) {
return (statement.getStatement() instanceof CreateStream
&& ((CreateStream) statement.getStatement()).isSource())
|| (statement.getStatement() instanceof CreateTable
&& ((CreateTable) statement.getStatement()).isSource());
}

private boolean isSourceTableMaterializationEnabled() {
// Do not get overridden configs because this must be set only from the Server side
return config.getConfig(false)
.getBoolean(KsqlConfig.KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED);
}

// Known to be non-empty
@SuppressWarnings("OptionalGetWithoutIsPresent")
KsqlPlan plan(final ConfiguredStatement<?> statement) {
try {
throwOnNonExecutableStatement(statement);

if (statement.getStatement() instanceof ExecutableDdlStatement) {
if (statement.getStatement() instanceof CreateTable
&& ((CreateTable) statement.getStatement()).isSource()) {
final boolean isSourceStream = statement.getStatement() instanceof CreateStream
&& ((CreateStream) statement.getStatement()).isSource();

final boolean isSourceTable = statement.getStatement() instanceof CreateTable
&& ((CreateTable) statement.getStatement()).isSource();

if ((isSourceStream || isSourceTable) && !isSourceTableMaterializationEnabled()) {
throw new KsqlStatementException("Cannot execute command because source table "
+ "materialization is disabled.", statement.getStatementText());
}

if (isSourceTable) {
return sourceTablePlan(statement);
} else {
final DdlCommand ddlCommand = engineContext.createDdlCommand(
Expand Down

0 comments on commit 5416cde

Please sign in to comment.