Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add constraint to deny drop sources referenced by other CREATE_AS sources #6545

Merged
merged 7 commits into from
Nov 5, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,7 @@ public void shouldFailToDescribeSourceViaExecuteStatement() {
4,
1,
"statement",
Collections.emptyList(),
Collections.emptyList()),
Collections.emptyList());
testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));
Expand Down Expand Up @@ -1352,6 +1353,7 @@ public void shouldDescribeSource() throws Exception {
4,
1,
"sql",
Collections.emptyList(),
Collections.emptyList()
);
final SourceDescriptionEntity entity = new SourceDescriptionEntity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,18 @@ private void printTopicInfo(final SourceDescription source) {
}
}

private void printSourceConstraints(final List<String> sourceConstraints) {
if (!sourceConstraints.isEmpty()) {
writer().println(String.format(
"%n%-20s%n%-20s",
"Sources that have a DROP constraint on this source",
"--------------------------------------------------"
));

sourceConstraints.forEach(sourceName -> writer().println(sourceName));
}
}

private void printQueries(
final List<RunningQuery> queries,
final String type,
Expand Down Expand Up @@ -632,6 +644,8 @@ private void printSourceDescription(final SourceDescription source) {

printSchema(source.getWindowType(), source.getFields(), isTable);

printSourceConstraints(source.getSourceConstraints());

printQueries(source.getReadQueries(), source.getType(), "read");

printQueries(source.getWriteQueries(), source.getType(), "write");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public class ConsoleTest {
2,
1,
"statement",
Collections.emptyList(),
Collections.emptyList());

@Mock
Expand Down Expand Up @@ -529,6 +530,7 @@ public void testPrintSourceDescription() {
1,
1,
"sql statement",
Collections.emptyList(),
Collections.emptyList()),
Collections.emptyList()
)
Expand Down Expand Up @@ -661,7 +663,8 @@ public void testPrintSourceDescription() {
+ " \"partitions\" : 1," + NEWLINE
+ " \"replication\" : 1," + NEWLINE
+ " \"statement\" : \"sql statement\"," + NEWLINE
+ " \"queryOffsetSummaries\" : [ ]" + NEWLINE
+ " \"queryOffsetSummaries\" : [ ]," + NEWLINE
+ " \"sourceConstraints\" : [ ]" + NEWLINE
+ " }," + NEWLINE
+ " \"warnings\" : [ ]" + NEWLINE
+ "} ]" + NEWLINE));
Expand Down Expand Up @@ -800,7 +803,8 @@ public void testPrintConnectorDescription() {
+ " \"partitions\" : 2," + NEWLINE
+ " \"replication\" : 1," + NEWLINE
+ " \"statement\" : \"statement\"," + NEWLINE
+ " \"queryOffsetSummaries\" : [ ]" + NEWLINE
+ " \"queryOffsetSummaries\" : [ ]," + NEWLINE
+ " \"sourceConstraints\" : [ ]" + NEWLINE
+ " } ]," + NEWLINE
+ " \"topics\" : [ \"a-jdbc-topic\" ]," + NEWLINE
+ " \"warnings\" : [ ]" + NEWLINE
Expand Down Expand Up @@ -1104,7 +1108,8 @@ public void shouldPrintTopicDescribeExtended() {
new QueryOffsetSummary(
"consumer2",
ImmutableList.of())
)),
),
ImmutableList.of("S1", "S2")),
Collections.emptyList()
))
);
Expand Down Expand Up @@ -1204,7 +1209,8 @@ public void shouldPrintTopicDescribeExtended() {
+ " }, {" + NEWLINE
+ " \"groupId\" : \"consumer2\"," + NEWLINE
+ " \"topicSummaries\" : [ ]" + NEWLINE
+ " } ]" + NEWLINE
+ " } ]," + NEWLINE
+ " \"sourceConstraints\" : [ \"S1\", \"S2\" ]" + NEWLINE
+ " }," + NEWLINE
+ " \"warnings\" : [ ]" + NEWLINE
+ "} ]" + NEWLINE));
Expand All @@ -1224,6 +1230,11 @@ public void shouldPrintTopicDescribeExtended() {
+ " f_0 | VARCHAR(STRING) " + NEWLINE
+ "-----------------------------------------" + NEWLINE
+ "" + NEWLINE
+ "Sources that have a DROP constraint on this source" + NEWLINE
+ "--------------------------------------------------" + NEWLINE
+ "S1" + NEWLINE
+ "S2" + NEWLINE
+ "" + NEWLINE
+ "Queries that read from this TABLE" + NEWLINE
+ "-----------------------------------" + NEWLINE
+ "readId (" + AGGREGATE_STATUS +") : read query" + NEWLINE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.confluent.ksql.util.DuplicateColumnException;
import io.confluent.ksql.util.KsqlException;
import java.util.Objects;
import java.util.Set;

/**
* Execute DDL Commands
Expand All @@ -48,7 +49,7 @@ public class DdlCommandExec {
private final MutableMetaStore metaStore;

public DdlCommandExec(final MutableMetaStore metaStore) {
this.metaStore = metaStore;
this.metaStore = Objects.requireNonNull(metaStore, "metaStore");
}

/**
Expand All @@ -57,17 +58,25 @@ public DdlCommandExec(final MutableMetaStore metaStore) {
public DdlCommandResult execute(
final String sql,
final DdlCommand ddlCommand,
final boolean withQuery) {
return new Executor(sql, withQuery).execute(ddlCommand);
final boolean withQuery,
final Set<SourceName> withQuerySources
) {
return new Executor(sql, withQuery, withQuerySources).execute(ddlCommand);
}

private final class Executor implements io.confluent.ksql.execution.ddl.commands.Executor {
private final String sql;
private final boolean withQuery;
private final Set<SourceName> withQuerySources;

private Executor(final String sql, final boolean withQuery) {
private Executor(
final String sql,
final boolean withQuery,
final Set<SourceName> withQuerySources
) {
this.sql = Objects.requireNonNull(sql, "sql");
this.withQuery = withQuery;
this.withQuerySources = Objects.requireNonNull(withQuerySources, "withQuerySources");
}

@Override
Expand All @@ -81,6 +90,7 @@ public DdlCommandResult executeCreateStream(final CreateStreamCommand createStre
getKsqlTopic(createStream)
);
metaStore.putSource(ksqlStream, createStream.isOrReplace());
metaStore.addSourceReferences(ksqlStream.getName(), withQuerySources);
return new DdlCommandResult(true, "Stream created");
}

Expand All @@ -95,6 +105,7 @@ public DdlCommandResult executeCreateTable(final CreateTableCommand createTable)
getKsqlTopic(createTable)
);
metaStore.putSource(ksqlTable, createTable.isOrReplace());
metaStore.addSourceReferences(ksqlTable.getName(), withQuerySources);
return new DdlCommandResult(true, "Table created");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.metastore.MutableMetaStore;
import io.confluent.ksql.metrics.StreamsErrorCollector;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.DefaultKsqlParser;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
Expand Down Expand Up @@ -219,9 +220,11 @@ DdlCommand createDdlCommand(
String executeDdl(
final String sqlExpression,
final DdlCommand command,
final boolean withQuery
final boolean withQuery,
final Set<SourceName> withQuerySources
) {
final DdlCommandResult result = ddlCommandExec.execute(sqlExpression, command, withQuery);
final DdlCommandResult result =
ddlCommandExec.execute(sqlExpression, command, withQuery, withQuerySources);
if (!result.isSuccess()) {
throw new KsqlStatementException(result.getMessage(), sqlExpression);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.PlanSummary;
import io.confluent.ksql.util.TransientQueryMetadata;

import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -104,15 +106,16 @@ ExecuteResult execute(final KsqlPlan plan) {
if (!plan.getQueryPlan().isPresent()) {
final String ddlResult = plan
.getDdlCommand()
.map(ddl -> executeDdl(ddl, plan.getStatementText(), false))
.map(ddl -> executeDdl(ddl, plan.getStatementText(), false, Collections.emptySet()))
.orElseThrow(
() -> new IllegalStateException(
"DdlResult should be present if there is no physical plan."));
return ExecuteResult.of(ddlResult);
}

final QueryPlan queryPlan = plan.getQueryPlan().get();
plan.getDdlCommand().map(ddl -> executeDdl(ddl, plan.getStatementText(), true));
plan.getDdlCommand().map(ddl ->
executeDdl(ddl, plan.getStatementText(), true, queryPlan.getSources()));
return ExecuteResult.of(executePersistentQuery(queryPlan, plan.getStatementText()));
}

Expand Down Expand Up @@ -362,10 +365,11 @@ private static Set<SourceName> getSourceNames(final PlanNode outputNode) {
private String executeDdl(
final DdlCommand ddlCommand,
final String statementText,
final boolean withQuery
final boolean withQuery,
final Set<SourceName> withQuerySources
) {
try {
return engineContext.executeDdl(statementText, ddlCommand, withQuery);
return engineContext.executeDdl(statementText, ddlCommand, withQuery, withQuerySources);
} catch (final KsqlStatementException e) {
throw e;
} catch (final Exception e) {
Expand Down
Loading