diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java index 0f698c0d4998..72728456b29e 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java @@ -144,7 +144,7 @@ public CreateTableCommand createTableCommand(final KsqlStructuredDataOutputNode Formats.from(outputNode.getKsqlTopic()), outputNode.getKsqlTopic().getKeyFormat().getWindowInfo(), Optional.of(outputNode.getOrReplace()), - Optional.of(CreateTable.Type.NORMAL.name()) + Optional.of(false) ); } @@ -192,7 +192,7 @@ public CreateTableCommand createTableCommand( buildFormats(statement.getName(), schema, props, ksqlConfig), getWindowInfo(props), Optional.of(statement.isOrReplace()), - Optional.of(statement.getType().name()) + Optional.of(statement.isSource()) ); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java index e1dcb0f373d0..5b863f6addaa 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java @@ -15,7 +15,6 @@ package io.confluent.ksql.ddl.commands; -import com.google.common.base.Enums; import io.confluent.ksql.execution.ddl.commands.AlterSourceCommand; import io.confluent.ksql.execution.ddl.commands.CreateSourceCommand; import io.confluent.ksql.execution.ddl.commands.CreateStreamCommand; @@ -33,7 +32,6 @@ import io.confluent.ksql.metastore.model.KsqlStream; import io.confluent.ksql.metastore.model.KsqlTable; import io.confluent.ksql.name.SourceName; -import io.confluent.ksql.parser.tree.CreateTable; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.serde.KeyFormat; @@ -121,10 +119,6 @@ public DdlCommandResult executeCreateTable(final CreateTableCommand createTable) sourceName, sourceType.toLowerCase())); } - final CreateTable.Type tableType = createTable.getType() - .map(typeStr -> Enums.getIfPresent(CreateTable.Type.class, typeStr).orNull()) - .orElse(CreateTable.Type.NORMAL); - final KsqlTable ksqlTable = new KsqlTable<>( sql, createTable.getSourceName(), @@ -132,8 +126,7 @@ public DdlCommandResult executeCreateTable(final CreateTableCommand createTable) createTable.getTimestampColumn(), withQuery, getKsqlTopic(createTable), - tableType == CreateTable.Type.SOURCE, - tableType == CreateTable.Type.SOURCE + createTable.isSource() ); metaStore.putSource(ksqlTable, createTable.isOrReplace()); metaStore.addSourceReferences(ksqlTable.getName(), withQuerySources); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java index 0801524d88b1..8ffa5b9aaaff 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java @@ -198,7 +198,7 @@ public void shouldCreateCommandForCreateTable() { TableElements.of( tableElement(Namespace.VALUE, "COL1", new Type(SqlTypes.BIGINT)), tableElement(Namespace.VALUE, "COL2", new Type(SqlTypes.STRING))), - false, true, withProperties, CreateTable.Type.NORMAL); + false, true, withProperties, false); // When: final DdlCommand result = commandFactories @@ -216,7 +216,7 @@ public void shouldCreateCommandForCreateSourceTable() { TableElements.of( tableElement(Namespace.VALUE, "COL1", new Type(SqlTypes.BIGINT)), tableElement(Namespace.VALUE, "COL2", new Type(SqlTypes.STRING))), - false, true, withProperties, CreateTable.Type.SOURCE); + false, true, withProperties, true); // When: final DdlCommand result = commandFactories @@ -234,7 +234,7 @@ public void shouldCreateCommandForCreateTableWithOverriddenProperties() { TableElements.of( tableElement(Namespace.VALUE, "COL1", new Type(SqlTypes.BIGINT)), tableElement(Namespace.VALUE, "COL2", new Type(SqlTypes.STRING))), - false, true, withProperties, CreateTable.Type.NORMAL); + false, true, withProperties, false); // When: commandFactories.create(sqlExpression, statement, SessionConfig.of(ksqlConfig, OVERRIDES)); @@ -373,7 +373,7 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromOverridesNotConfi final DdlStatement statement = new CreateTable(SOME_NAME, ELEMENTS_WITH_PK, - false, true, withProperties, CreateTable.Type.NORMAL); + false, true, withProperties, false); // When: final DdlCommand cmd = commandFactories diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceFactoryTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceFactoryTest.java index 2b14d130f4ae..182521e386b7 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceFactoryTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceFactoryTest.java @@ -293,7 +293,7 @@ public void shouldCreateCommandForCreateTable() { TableElements.of( tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)), tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))), - false, true, withProperties, CreateTable.Type.NORMAL); + false, true, withProperties, false); // When: final CreateTableCommand result = createSourceFactory @@ -302,6 +302,26 @@ public void shouldCreateCommandForCreateTable() { // Then: assertThat(result.getSourceName(), is(SOME_NAME)); assertThat(result.getTopicName(), is(TOPIC_NAME)); + assertThat(result.isSource(), is(false)); + } + + @Test + public void shouldCreateCommandForCreateSourceTable() { + // Given: + final CreateTable ddlStatement = new CreateTable(SOME_NAME, + TableElements.of( + tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)), + tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))), + false, true, withProperties, true); + + // When: + final CreateTableCommand result = createSourceFactory + .createTableCommand(ddlStatement, ksqlConfig); + + // Then: + assertThat(result.getSourceName(), is(SOME_NAME)); + assertThat(result.getTopicName(), is(TOPIC_NAME)); + assertThat(result.isSource(), is(true)); } @Test @@ -380,7 +400,7 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromPropertiesNotConf final CreateTable statement = new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, - false, true, withProperties, CreateTable.Type.NORMAL); + false, true, withProperties, false); // When: final CreateTableCommand cmd = createSourceFactory @@ -402,7 +422,7 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromConfig() { final CreateTable statement = new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, - false, true, withProperties, CreateTable.Type.NORMAL); + false, true, withProperties, false); // When: final CreateTableCommand cmd = createSourceFactory @@ -418,7 +438,7 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromDefaultConfig() { // Given: final CreateTable statement = new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, - false, true, withProperties, CreateTable.Type.NORMAL); + false, true, withProperties, false); // When: final CreateTableCommand cmd = createSourceFactory @@ -450,7 +470,7 @@ public void shouldThrowOnNoElementsInCreateTable() { // Given: final CreateTable statement = new CreateTable(SOME_NAME, TableElements.of(), - false, true, withProperties, CreateTable.Type.NORMAL); + false, true, withProperties, false); // When: final Exception e = assertThrows( @@ -480,7 +500,7 @@ public void shouldNotThrowWhenThereAreElementsInCreateTable() { // Given: final CreateTable statement = new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, - false, true, withProperties, CreateTable.Type.NORMAL); + false, true, withProperties, false); // When: createSourceFactory.createTableCommand(statement, ksqlConfig); @@ -615,7 +635,7 @@ public void shouldBuildTimestampColumnForTable() { ); final CreateTable statement = new CreateTable(SOME_NAME, TABLE_ELEMENTS, - false, true, withProperties, CreateTable.Type.NORMAL); + false, true, withProperties, false); // When: final CreateTableCommand cmd = createSourceFactory.createTableCommand( @@ -737,7 +757,7 @@ public void shouldCreateValueSerdeToValidateValueFormatCanHandleValueSchema() { // Given: givenCommandFactoriesWithMocks(); final CreateTable statement = new CreateTable(SOME_NAME, TABLE_ELEMENTS, false, true, - withProperties, CreateTable.Type.NORMAL); + withProperties, false); when(valueSerdeFactory.create( FormatInfo.of(JSON.name()), @@ -1017,7 +1037,7 @@ public void shouldThrowIfTableIsMissingPrimaryKey() { final CreateTable statement = new CreateTable(SOME_NAME, noKey, - false, true, withProperties, CreateTable.Type.NORMAL); + false, true, withProperties, false); // When: final Exception e = assertThrows( @@ -1067,7 +1087,7 @@ public void shouldNotThrowOnCreateTableIfNotExistsIsSet() { TableElements.of( tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)), tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))), - false, true, withProperties, CreateTable.Type.NORMAL); + false, true, withProperties, false); // When: final CreateTableCommand result = createSourceFactory @@ -1084,7 +1104,7 @@ public void shouldThrowIfTableExists() { TableElements.of( tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)), tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))), - false, false, withProperties, CreateTable.Type.NORMAL); + false, false, withProperties, false); // When: final Exception e = assertThrows( diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DdlCommandExecTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DdlCommandExecTest.java index 93972945804c..883512fd8431 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DdlCommandExecTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DdlCommandExecTest.java @@ -24,7 +24,6 @@ import io.confluent.ksql.metastore.model.KsqlTable; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.SourceName; -import io.confluent.ksql.parser.tree.CreateTable; import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlType; @@ -273,8 +272,7 @@ public void shouldAddNormalTableWhenNoTypeIsSpecified() { // Then: final KsqlTable ksqlTable = (KsqlTable) metaStore.getSource(SourceName.of("t1")); - assertThat(ksqlTable.isReadOnly(), is(false)); - assertThat(ksqlTable.isMaterialized(), is(false)); + assertThat(ksqlTable.isSource(), is(false)); } @Test @@ -283,7 +281,7 @@ public void shouldAddSourceTable() { final CreateTableCommand cmd = buildCreateTable( SourceName.of("t1"), false, - CreateTable.Type.SOURCE.name() + true ); // When: @@ -291,19 +289,18 @@ public void shouldAddSourceTable() { // Then: final KsqlTable ksqlTable = (KsqlTable) metaStore.getSource(SourceName.of("t1")); - assertThat(ksqlTable.isReadOnly(), is(true)); - assertThat(ksqlTable.isMaterialized(), is(true)); + assertThat(ksqlTable.isSource(), is(true)); } @Test public void shouldThrowOnDropTableWhenConstraintExist() { // Given: final CreateTableCommand table1 = buildCreateTable(SourceName.of("t1"), - false, CreateTable.Type.NORMAL.name()); + false, false); final CreateTableCommand table2 = buildCreateTable(SourceName.of("t2"), - false, CreateTable.Type.NORMAL.name()); + false, false); final CreateTableCommand table3 = buildCreateTable(SourceName.of("t3"), - false, CreateTable.Type.NORMAL.name()); + false, false); cmdExec.execute(SQL_TEXT, table1, true, Collections.emptySet()); cmdExec.execute(SQL_TEXT, table2, true, Collections.singleton(SourceName.of("t1"))); cmdExec.execute(SQL_TEXT, table3, true, Collections.singleton(SourceName.of("t1"))); @@ -588,18 +585,18 @@ private void givenCreateWindowedTable() { ), Optional.of(windowInfo), Optional.of(false), - Optional.of(CreateTable.Type.NORMAL.name()) + Optional.of(false) ); } private void givenCreateTable() { - createTable = buildCreateTable(TABLE_NAME, false, CreateTable.Type.NORMAL.name()); + createTable = buildCreateTable(TABLE_NAME, false, false); } private CreateTableCommand buildCreateTable( final SourceName sourceName, final boolean allowReplace, - final String type + final Boolean isSource ) { return new CreateTableCommand( sourceName, @@ -614,7 +611,7 @@ private CreateTableCommand buildCreateTable( ), Optional.empty(), Optional.of(allowReplace), - Optional.ofNullable(type) + Optional.ofNullable(isSource) ); } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriterTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriterTest.java index af0e71cffba0..c2f02605484a 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriterTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriterTest.java @@ -660,7 +660,7 @@ public void shouldRewriteCreateTable() { false, false, sourceProperties, - CreateTable.Type.NORMAL + false ); when(mockRewriter.apply(tableElement1, context)).thenReturn(rewrittenTableElement1); when(mockRewriter.apply(tableElement2, context)).thenReturn(rewrittenTableElement2); @@ -679,7 +679,7 @@ public void shouldRewriteCreateTable() { false, false, sourceProperties, - CreateTable.Type.NORMAL + false ) ) ); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java index c35ef059a0d6..44a430294b4a 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java @@ -245,7 +245,6 @@ public void shouldBuildSchemaKTableWhenKTableSource() { KeyFormat.nonWindowed(FormatInfo.of(FormatFactory.KAFKA.name()), SerdeFeatures.of()), ValueFormat.of(FormatInfo.of(FormatFactory.JSON.name()), SerdeFeatures.of()) ), - false, false ); diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateTableCommand.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateTableCommand.java index acfc763d69d2..d658d1384e3c 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateTableCommand.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateTableCommand.java @@ -28,7 +28,7 @@ @JsonIgnoreProperties({"keyField"}) // Removed at version 0.10 @Immutable public class CreateTableCommand extends CreateSourceCommand { - private final Optional type; + private final Optional isSource; public CreateTableCommand( @JsonProperty(value = "sourceName", required = true) final SourceName sourceName, @@ -38,7 +38,7 @@ public CreateTableCommand( @JsonProperty(value = "formats", required = true) final Formats formats, @JsonProperty(value = "windowInfo") final Optional windowInfo, @JsonProperty(value = "orReplace", defaultValue = "false") final Optional orReplace, - @JsonProperty(value = "type", defaultValue = "NORMAL") final Optional type + @JsonProperty(value = "isSource", defaultValue = "false") final Optional isSource ) { super( sourceName, @@ -54,11 +54,14 @@ public CreateTableCommand( throw new UnsupportedOperationException("Tables require key columns"); } - this.type = type; + this.isSource = isSource; } - public Optional getType() { - return type; + // This can be in CreateSourceCommand, but it fails deserializing the JSON property when + // loading a CreateStreamCommand because it is not supported there yet. We should move this + // source variable and method to the CreateSourceCommand after supporting source streams. + public Boolean isSource() { + return isSource.orElse(false); } @Override diff --git a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/DataSource.java b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/DataSource.java index 6176ad4c1280..71f010bdc5d2 100644 --- a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/DataSource.java +++ b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/DataSource.java @@ -107,5 +107,5 @@ public String getKsqlType() { /** * @return returns true if this source is read-only */ - boolean isReadOnly(); + boolean isSource(); } diff --git a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/KsqlTable.java b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/KsqlTable.java index b2912e933c83..1915ab199c64 100644 --- a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/KsqlTable.java +++ b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/KsqlTable.java @@ -24,8 +24,6 @@ @Immutable public class KsqlTable extends StructuredDataSource { - private final boolean materialized; - public KsqlTable( final String sqlExpression, final SourceName datasourceName, @@ -33,8 +31,7 @@ public KsqlTable( final Optional timestampExtractionPolicy, final boolean isKsqlSink, final KsqlTopic ksqlTopic, - final boolean readOnly, - final boolean materialized + final boolean isSourceTable ) { super( sqlExpression, @@ -44,14 +41,8 @@ public KsqlTable( DataSourceType.KTABLE, isKsqlSink, ksqlTopic, - readOnly + isSourceTable ); - - this.materialized = materialized; - } - - public boolean isMaterialized() { - return materialized; } @Override @@ -63,8 +54,7 @@ public DataSource with(final String sql, final LogicalSchema schema) { getTimestampColumn(), isCasTarget(), getKsqlTopic(), - isReadOnly(), - isMaterialized() + isSource() ); } } diff --git a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/StructuredDataSource.java b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/StructuredDataSource.java index 40c794995a62..e11f2afcaa08 100644 --- a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/StructuredDataSource.java +++ b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/StructuredDataSource.java @@ -47,7 +47,7 @@ abstract class StructuredDataSource implements DataSource { private final KsqlTopic ksqlTopic; private final String sqlExpression; private final boolean casTarget; - private final boolean readOnly; + private final boolean isSource; private static final ImmutableList> PROPERTIES = ImmutableList.of( new Property<>("name", DataSource::getName), @@ -66,7 +66,7 @@ abstract class StructuredDataSource implements DataSource { final DataSourceType dataSourceType, final boolean casTarget, final KsqlTopic ksqlTopic, - final boolean readOnly + final boolean isSource ) { this.sqlExpression = requireNonNull(sqlExpression, "sqlExpression"); this.dataSourceName = requireNonNull(dataSourceName, "dataSourceName"); @@ -75,7 +75,7 @@ abstract class StructuredDataSource implements DataSource { this.dataSourceType = requireNonNull(dataSourceType, "dataSourceType"); this.ksqlTopic = requireNonNull(ksqlTopic, "ksqlTopic"); this.casTarget = casTarget; - this.readOnly = readOnly; + this.isSource = isSource; if (schema.valueContainsAny(SystemColumns.systemColumnNames())) { throw new IllegalArgumentException("Schema contains system columns in value schema"); @@ -130,8 +130,8 @@ public String getSqlExpression() { } @Override - public boolean isReadOnly() { - return readOnly; + public boolean isSource() { + return isSource; } @Override diff --git a/ksqldb-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceTest.java b/ksqldb-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceTest.java index f9061c191fc0..3912d23d92a1 100644 --- a/ksqldb-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceTest.java +++ b/ksqldb-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceTest.java @@ -212,7 +212,6 @@ public void shouldEnforceSameType() { Optional.empty(), true, topic, - false, false ); diff --git a/ksqldb-metastore/src/test/java/io/confluent/ksql/util/MetaStoreFixture.java b/ksqldb-metastore/src/test/java/io/confluent/ksql/util/MetaStoreFixture.java index 7d18c2827491..75df78465f97 100644 --- a/ksqldb-metastore/src/test/java/io/confluent/ksql/util/MetaStoreFixture.java +++ b/ksqldb-metastore/src/test/java/io/confluent/ksql/util/MetaStoreFixture.java @@ -117,7 +117,6 @@ public static MutableMetaStore getNewMetaStore( Optional.empty(), false, ksqlTopic2, - false, false ); @@ -194,7 +193,6 @@ public static MutableMetaStore getNewMetaStore( Optional.empty(), false, ksqlTopic3, - false, false ); @@ -299,7 +297,6 @@ public static MutableMetaStore getNewMetaStore( Optional.empty(), false, ksqlTopic5, - false, false ); metaStore.putSource(ksqlTable5, false); diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index e90e0dfb3a47..d8c7108267a0 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -281,10 +281,6 @@ public Node visitCreateTable(final SqlBaseParser.CreateTableContext context) { final Map properties = processTableProperties(context.tableProperties()); - final CreateTable.Type type = context.SOURCE() != null - ? CreateTable.Type.SOURCE - : CreateTable.Type.NORMAL; - return new CreateTable( getLocation(context), ParserUtil.getSourceName(context.sourceName()), @@ -292,7 +288,7 @@ public Node visitCreateTable(final SqlBaseParser.CreateTableContext context) { context.REPLACE() != null, context.EXISTS() != null, CreateSourceProperties.from(properties), - type + context.SOURCE() != null ); } @@ -1444,7 +1440,7 @@ public Node visitAssertTable(final AssertTableContext context) { false, false, CreateSourceProperties.from(properties), - CreateTable.Type.NORMAL + false ); return new AssertTable(getLocation(context), createTable); diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java index 06e4add0c479..ef9f69494804 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java @@ -260,7 +260,7 @@ protected Void visitCreateStream(final CreateStream node, final Integer indent) @Override protected Void visitCreateTable(final CreateTable node, final Integer indent) { formatCreate(node, - node.getType() == CreateTable.Type.SOURCE + node.isSource() ? "SOURCE TABLE" : "TABLE" ); diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateSource.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateSource.java index 2a5d74a11c3a..f14c7aeb028c 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateSource.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateSource.java @@ -32,6 +32,7 @@ public abstract class CreateSource extends Statement { private final boolean notExists; private final CreateSourceProperties properties; private final boolean orReplace; + private final boolean isSource; CreateSource( final Optional location, @@ -39,7 +40,8 @@ public abstract class CreateSource extends Statement { final TableElements elements, final boolean orReplace, final boolean notExists, - final CreateSourceProperties properties + final CreateSourceProperties properties, + final boolean isSource ) { super(location); this.name = requireNonNull(name, "name"); @@ -47,6 +49,7 @@ public abstract class CreateSource extends Statement { this.orReplace = orReplace; this.notExists = notExists; this.properties = requireNonNull(properties, "properties"); + this.isSource = isSource; } public CreateSourceProperties getProperties() { @@ -69,11 +72,15 @@ public boolean isNotExists() { return notExists; } + public boolean isSource() { + return isSource; + } + public abstract CreateSource copyWith(TableElements elements, CreateSourceProperties properties); @Override public int hashCode() { - return Objects.hash(name, elements, orReplace, notExists, properties); + return Objects.hash(name, elements, orReplace, notExists, properties, isSource); } @Override @@ -89,6 +96,7 @@ public boolean equals(final Object o) { && orReplace == that.orReplace && Objects.equals(name, that.name) && Objects.equals(elements, that.elements) - && Objects.equals(properties, that.properties); + && Objects.equals(properties, that.properties) + && isSource == that.isSource; } } diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateStream.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateStream.java index e9d7df9466a4..5f7c4a14dc58 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateStream.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateStream.java @@ -46,7 +46,7 @@ public CreateStream( final boolean notExists, final CreateSourceProperties properties ) { - super(location, name, elements, orReplace, notExists, properties); + super(location, name, elements, orReplace, notExists, properties, false); throwOnPrimaryKeys(elements); } diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateTable.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateTable.java index 0b63d41e14bf..9f81e5139f85 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateTable.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateTable.java @@ -27,22 +27,15 @@ @Immutable public class CreateTable extends CreateSource implements ExecutableDdlStatement { - public enum Type { - SOURCE, - NORMAL - } - - private final Type type; - public CreateTable( final SourceName name, final TableElements elements, final boolean orReplace, final boolean notExists, final CreateSourceProperties properties, - final Type type + final boolean isSource ) { - this(Optional.empty(), name, elements, orReplace, notExists, properties, type); + this(Optional.empty(), name, elements, orReplace, notExists, properties, isSource); } public CreateTable( @@ -52,18 +45,13 @@ public CreateTable( final boolean orReplace, final boolean notExists, final CreateSourceProperties properties, - final Type type + final boolean isSource ) { - super(location, name, elements, orReplace, notExists, properties); - this.type = type; + super(location, name, elements, orReplace, notExists, properties, isSource); throwOnNonPrimaryKeys(elements); } - public Type getType() { - return type; - } - @Override public CreateSource copyWith( final TableElements elements, @@ -76,7 +64,7 @@ public CreateSource copyWith( isOrReplace(), isNotExists(), properties, - type); + isSource()); } @Override @@ -103,7 +91,7 @@ public String toString() { .add("orReplace", isOrReplace()) .add("notExists", isNotExists()) .add("properties", getProperties()) - .add("type", type) + .add("isSource", isSource()) .toString(); } diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/AstBuilderTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/AstBuilderTest.java index 07cbc6a73c46..d33ee92bb71e 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/AstBuilderTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/AstBuilderTest.java @@ -614,7 +614,7 @@ public void shouldCreateSourceTable() { final CreateTable result = (CreateTable) builder.buildStatement(stmt); // Then: - assertThat(result.getType(), is(CreateTable.Type.SOURCE)); + assertThat(result.isSource(), is(true)); } @Test @@ -627,7 +627,7 @@ public void shouldCreateNormalTable() { final CreateTable result = (CreateTable) builder.buildStatement(stmt); // Then: - assertThat(result.getType(), is(CreateTable.Type.NORMAL)); + assertThat(result.isSource(), is(false)); } @Test diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java index 91dcfb671643..963d774bd298 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java @@ -377,7 +377,7 @@ public void testCreateSourceTable() { "value_format='json', partitions=1, replicas=-1);", metaStore).getStatement(); // Then: - assertThat(stmt.getType(), is(CreateTable.Type.SOURCE)); + assertThat(stmt.isSource(), is(true)); } @Test diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java index da197175be6d..62e5a5fc756a 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java @@ -203,7 +203,6 @@ public void setUp() { Optional.empty(), false, ksqlTopicItems, - false, false ); @@ -216,7 +215,6 @@ public void setUp() { Optional.empty(), false, ksqlTopicItems, - false, false ); @@ -296,7 +294,7 @@ public void shouldFormatCreateSourceTableStatement() { false, false, props, - CreateTable.Type.SOURCE); + true); // When: final String sql = SqlFormatter.formatSql(createTable); @@ -320,7 +318,7 @@ public void shouldFormatCreateOrReplaceTableStatement() { true, false, props, - CreateTable.Type.NORMAL); + false); // When: final String sql = SqlFormatter.formatSql(createTable); @@ -346,7 +344,7 @@ public void shouldFormatCreateTableStatementWithExplicitTimestamp() { false, false, props, - CreateTable.Type.NORMAL); + false); // When: final String sql = SqlFormatter.formatSql(createTable); @@ -366,7 +364,7 @@ public void shouldFormatCreateTableStatementWithExplicitKey() { false, false, SOME_WITH_PROPS, - CreateTable.Type.NORMAL); + false); // When: final String sql = SqlFormatter.formatSql(createTable); @@ -385,7 +383,7 @@ public void shouldFormatCreateTableStatementWithImplicitKey() { false, false, SOME_WITH_PROPS, - CreateTable.Type.NORMAL); + false); // When: final String sql = SqlFormatter.formatSql(createTable); diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableTest.java index 56d40d5c5192..04564366d4d3 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableTest.java @@ -15,8 +15,6 @@ package io.confluent.ksql.parser.tree; -import static io.confluent.ksql.parser.tree.CreateTable.Type.NORMAL; -import static io.confluent.ksql.parser.tree.CreateTable.Type.SOURCE; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertThrows; @@ -62,28 +60,28 @@ public void shouldImplementHashCodeAndEqualsProperty() { new EqualsTester() .addEqualityGroup( // Note: At the moment location does not take part in equality testing - new CreateTable(SOME_NAME, SOME_ELEMENTS, false, true, SOME_PROPS, NORMAL), - new CreateTable(SOME_NAME, SOME_ELEMENTS, false, true, SOME_PROPS, NORMAL), - new CreateTable(Optional.of(SOME_LOCATION), SOME_NAME, SOME_ELEMENTS, false, true, SOME_PROPS, NORMAL), - new CreateTable(Optional.of(OTHER_LOCATION), SOME_NAME, SOME_ELEMENTS, false, true, SOME_PROPS, NORMAL) + new CreateTable(SOME_NAME, SOME_ELEMENTS, false, true, SOME_PROPS, false), + new CreateTable(SOME_NAME, SOME_ELEMENTS, false, true, SOME_PROPS, false), + new CreateTable(Optional.of(SOME_LOCATION), SOME_NAME, SOME_ELEMENTS, false, true, SOME_PROPS, false), + new CreateTable(Optional.of(OTHER_LOCATION), SOME_NAME, SOME_ELEMENTS, false, true, SOME_PROPS, false) ) .addEqualityGroup( - new CreateTable(SourceName.of("jim"), SOME_ELEMENTS, false, true, SOME_PROPS, NORMAL) + new CreateTable(SourceName.of("jim"), SOME_ELEMENTS, false, true, SOME_PROPS, false) ) .addEqualityGroup( - new CreateTable(SOME_NAME, TableElements.of(), false, true, SOME_PROPS, NORMAL) + new CreateTable(SOME_NAME, TableElements.of(), false, true, SOME_PROPS, false) ) .addEqualityGroup( - new CreateTable(SOME_NAME, SOME_ELEMENTS, false, false, SOME_PROPS, NORMAL) + new CreateTable(SOME_NAME, SOME_ELEMENTS, false, false, SOME_PROPS, false) ) .addEqualityGroup( - new CreateTable(SOME_NAME, SOME_ELEMENTS, true, true, SOME_PROPS, SOURCE) + new CreateTable(SOME_NAME, SOME_ELEMENTS, true, true, SOME_PROPS, true) ) .addEqualityGroup( - new CreateTable(SOME_NAME, SOME_ELEMENTS, false, true, OTHER_PROPS, NORMAL) + new CreateTable(SOME_NAME, SOME_ELEMENTS, false, true, OTHER_PROPS, false) ) .addEqualityGroup( - new CreateTable(SOME_NAME, SOME_ELEMENTS, false, false, OTHER_PROPS, SOURCE) + new CreateTable(SOME_NAME, SOME_ELEMENTS, false, false, OTHER_PROPS, true) ) .testEquals(); } @@ -112,7 +110,7 @@ public void shouldThrowOnNonePrimaryKey() { // When: final ParseFailedException e = assertThrows( ParseFailedException.class, - () -> new CreateTable(SOME_NAME, invalidElements, false, false, SOME_PROPS, NORMAL) + () -> new CreateTable(SOME_NAME, invalidElements, false, false, SOME_PROPS, false) ); // Then: diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java index b7acdb02be7a..28bf8775fd9a 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java @@ -528,7 +528,7 @@ public void shouldRunCtStatement() { // Given: final PreparedStatement ct = PreparedStatement.of("CT", new CreateTable(SOME_NAME, SOME_ELEMENTS, false, false, JSON_PROPS, - CreateTable.Type.NORMAL)); + false)); givenQueryFileParsesTo(ct); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TemporaryEngine.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TemporaryEngine.java index 7ce7f92dbde0..0fe01782bfe3 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TemporaryEngine.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TemporaryEngine.java @@ -162,7 +162,6 @@ public T givenSource( Optional.empty(), false, topic, - false, false ); break; diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutorTest.java index fe9a06d4e24b..6ab99b0a66e3 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutorTest.java @@ -1122,7 +1122,6 @@ private void givenDataSourceWithSchema( Optional.empty(), false, topic, - false, false ); } else { diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index 823dd06aa0c8..4625afa534a0 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -2641,7 +2641,6 @@ private void givenSource( Optional.empty(), false, ksqlTopic, - false, false ); break; diff --git a/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json b/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json index 6a3fc5271e9d..3b28c70e9c43 100644 --- a/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json +++ b/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json @@ -182,9 +182,9 @@ "type" : "boolean", "default" : false }, - "type" : { - "type" : "string", - "default" : "NORMAL" + "isSource" : { + "type" : "boolean", + "default" : false } }, "title" : "createTableV1",