From e27b22cd8ef8c4017a2ae8d0b3869ec2c5482bf1 Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Mon, 29 Apr 2019 14:33:40 +0100 Subject: [PATCH] Drop requirement that `CREATE TABLE` statements have a `KEY` set in `WITH` clause. Fixes #2745 --- docs/changelog.rst | 6 +- docs/developer-guide/syntax-reference.rst | 27 ++++----- .../TimestampExtractionPolicyFactory.java | 8 +-- .../commands/AbstractCreateStreamCommand.java | 6 +- .../ksql/ddl/commands/CreateTableCommand.java | 12 +--- .../AbstractCreateStreamCommandTest.java | 38 ++++++++++++ .../ddl/commands/CommandFactoriesTest.java | 59 ------------------- .../ddl/commands/CreateStreamCommandTest.java | 44 ++++++++++++-- .../ddl/commands/CreateTableCommandTest.java | 43 ++++++++++++-- .../query-validation-tests/key-field.json | 52 ++++++---------- .../metastore/model/MetaStoreMatchers.java | 8 +++ 11 files changed, 164 insertions(+), 139 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index 0939436c75f0..18e3f9f58f05 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -6,6 +6,10 @@ Version 5.3.0 KSQL 5.3.0 includes new features, including: +* Drop the requirement that ``CREATE TABLE`` statements must have a ``KEY`` set in their ``WITH`` clause. + This is now an optional optimisation to avoid unnecessary repartition steps. + See `Github issue #2745 `_ for more info. + * Improved handling of ``KEY`` fields. The ``KEY`` field is an optional copy of the Kafka record's key held within the record's value. Users can supply the name of the field that holds the copy of the key within the ``WITH`` clause. @@ -13,7 +17,7 @@ KSQL 5.3.0 includes new features, including: Please note that preexisting persistent queries, e.g. those created via ``CREATE TABLE AS SELECT ...`` or ``CREATE STREAM AS SELECT ...`` or ``INSERT INTO ...``, will continue to have the unnecessary repartition step. This is required to avoid the potential for data loss should this step be dropped. - See `#2280 `_ for more info. + See `Github issue #2636 `_ for more info. Version 5.2.0 diff --git a/docs/developer-guide/syntax-reference.rst b/docs/developer-guide/syntax-reference.rst index 8bfcaebfc845..58f43ae0c2a2 100644 --- a/docs/developer-guide/syntax-reference.rst +++ b/docs/developer-guide/syntax-reference.rst @@ -379,10 +379,7 @@ timestamp and message key, respectively. The timestamp has milliseconds accuracy KSQL has currently the following requirements for creating a table from a Kafka topic: -1. The Kafka message key must also be present as a field/column in the Kafka message value. The ``KEY`` property (see - below) must be defined to inform KSQL which field/column in the message value represents the key. If the message key - is not present in the message value, follow the instructions in :ref:`ksql_key_requirements`. -2. The message key must be in ``VARCHAR`` aka ``STRING`` format. If the message key is not in this format, follow the +1. The message key must be in ``VARCHAR`` aka ``STRING`` format. If the message key is not in this format, follow the instructions in :ref:`ksql_key_requirements`. The WITH clause supports the following properties: @@ -395,13 +392,13 @@ The WITH clause supports the following properties: | VALUE_FORMAT (required) | Specifies the serialization format of message values in the topic. Supported formats: | | | ``JSON``, ``DELIMITED`` (comma-separated value), and ``AVRO``. | +-------------------------+--------------------------------------------------------------------------------------------+ -| KEY (required) | Associates a field/column within the Kafka message value with the implicit ``ROWKEY`` | -| | column (message key) in the KSQL table. | -| | | -| | KSQL currently requires that the Kafka message key, which will be available as the | -| | implicit ``ROWKEY`` column in the table, must also be present as a field/column in the | -| | message value. You must set the KEY property to this corresponding field/column in the | -| | message value, and this column must be in ``VARCHAR`` aka ``STRING`` format. | +| KEY | Optimization hint: If the Kafka message key is also present as a field/column in the Kafka | +| | message value, you may set this property to associate the corresponding field/column with | +| | the implicit ``ROWKEY`` column (message key). | +| | If set, KSQL uses it as an optimization hint to determine if repartitioning can be avoided | +| | when performing aggregations and joins. | +| | You can only use this if the key format in kafka is ``VARCHAR`` or ``STRING``. Do not use | +| | this hint if the message key format in kafka is AVRO or JSON. | | | See :ref:`ksql_key_requirements` for more information. | +-------------------------+--------------------------------------------------------------------------------------------+ | TIMESTAMP | By default, the implicit ``ROWTIME`` column is the timestamp of the message in the Kafka | @@ -1649,7 +1646,8 @@ Key Requirements Message Keys ------------ -The ``CREATE STREAM`` and ``CREATE TABLE`` statements, which read data from a Kafka topic into a stream or table, allow you to specify a field/column in the Kafka message value that corresponds to the Kafka message key by setting the ``KEY`` property of the ``WITH`` clause. +The ``CREATE STREAM`` and ``CREATE TABLE`` statements, which read data from a Kafka topic into a stream or table, +allow you to specify a field/column in the Kafka message value that corresponds to the Kafka message key by setting the ``KEY`` property of the ``WITH`` clause. Example: @@ -1659,10 +1657,7 @@ Example: WITH (KAFKA_TOPIC='users', VALUE_FORMAT='JSON', KEY = 'userid'); -The ``KEY`` property is: - -- Required for tables. -- Optional for streams. Here, KSQL uses it as an optimization hint to determine if repartitioning can be avoided when performing aggregations and joins. +The ``KEY`` property is optional. KSQL uses it as an optimization hint to determine if repartitioning can be avoided when performing aggregations and joins. .. important:: Don't set the KEY property, unless you have validated that your stream doesn't need to be re-partitioned for future joins. diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactory.java b/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactory.java index 4ed536be395c..98308010bce6 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactory.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactory.java @@ -38,11 +38,9 @@ public static TimestampExtractionPolicy create( final String fieldName = StringUtil.cleanQuotes(timestampColumnName.toUpperCase()); final Field timestampField = SchemaUtil.getFieldByName(schema, fieldName) - .orElseThrow(() -> new KsqlException(String.format( - "No column with the provided timestamp column name in the " - + "WITH clause, %s, exists in the defined schema.", - fieldName - ))); + .orElseThrow(() -> new KsqlException( + "The TIMESTAMP column '" + fieldName + + "', set in the WITH clause, does not exist in the schema")); final Schema.Type timestampFieldType = timestampField.schema().type(); if (timestampFieldType == Schema.Type.STRING) { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/AbstractCreateStreamCommand.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/AbstractCreateStreamCommand.java index ddce526ed32c..cbb21e2256ea 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/AbstractCreateStreamCommand.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/AbstractCreateStreamCommand.java @@ -97,8 +97,8 @@ abstract class AbstractCreateStreamCommand implements DdlCommand { final String keyFieldName = StringUtil.cleanQuotes(name); final Field keyField = SchemaUtil.getFieldByName(schema, keyFieldName) .orElseThrow(() -> new KsqlException( - "No column with the provided key column name in the WITH " - + "clause, " + keyFieldName + ", exists in the defined schema." + "The KEY column '" + keyFieldName + + "', set in the WITH clause, does not exist in the schema" )); this.keyField = KeyField.of(keyFieldName, keyField); @@ -120,7 +120,6 @@ abstract class AbstractCreateStreamCommand implements DdlCommand { } private static void checkTopicNameNotNull(final Map properties) { - // TODO: move the check to grammar if (properties.get(DdlConfig.TOPIC_NAME_PROPERTY) == null) { throw new KsqlException("Topic name should be set in WITH clause."); } @@ -155,7 +154,6 @@ static void checkMetaData( final String sourceName, final String topicName ) { - // TODO: move the check to the runtime since it accesses metaStore if (metaStore.getSource(sourceName) != null) { throw new KsqlException(String.format("Source already exists: %s", sourceName)); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateTableCommand.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateTableCommand.java index 58207e665f6f..18474350adb4 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateTableCommand.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateTableCommand.java @@ -15,15 +15,12 @@ package io.confluent.ksql.ddl.commands; -import io.confluent.ksql.ddl.DdlConfig; import io.confluent.ksql.metastore.MutableMetaStore; import io.confluent.ksql.metastore.model.KsqlTable; import io.confluent.ksql.parser.tree.CreateTable; -import io.confluent.ksql.parser.tree.Expression; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.SchemaUtil; -import java.util.Map; public class CreateTableCommand extends AbstractCreateStreamCommand { @@ -33,14 +30,6 @@ public class CreateTableCommand extends AbstractCreateStreamCommand { final KafkaTopicClient kafkaTopicClient ) { super(sqlExpression, createTable, kafkaTopicClient); - - final Map properties = createTable.getProperties(); - - if (!properties.containsKey(DdlConfig.KEY_NAME_PROPERTY)) { - throw new KsqlException( - "Cannot define a TABLE without providing the KEY column name in the WITH clause." - ); - } } @Override @@ -55,6 +44,7 @@ public DdlCommandResult run(final MutableMetaStore metaStore) { } } checkMetaData(metaStore, sourceName, topicName); + final KsqlTable ksqlTable = new KsqlTable<>( sqlExpression, sourceName, diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/AbstractCreateStreamCommandTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/AbstractCreateStreamCommandTest.java index a851ed110fbb..6ef361b00aac 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/AbstractCreateStreamCommandTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/AbstractCreateStreamCommandTest.java @@ -146,6 +146,38 @@ public void shouldNotThrowIfTopicDoesExist() { verify(kafkaTopicClient).isTopicExists(TOPIC_NAME); } + @Test + public void shouldThrowIfKeyFieldNotInSchema() { + // Given: + when(statement.getProperties()).thenReturn(minValidProps()); + givenPropertiesWith(ImmutableMap.of( + DdlConfig.KEY_NAME_PROPERTY, new StringLiteral("will-not-find-me"))); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "The KEY column 'WILL-NOT-FIND-ME', set in the WITH clause, does not exist in the schema"); + + // When: + new TestCmd("key not in schema!", statement, kafkaTopicClient); + } + + @Test + public void shouldThrowIfTimestampColumnDoesNotExist() { + // Given: + when(statement.getProperties()).thenReturn(minValidProps()); + givenPropertiesWith(ImmutableMap.of( + DdlConfig.TIMESTAMP_NAME_PROPERTY, new StringLiteral("will-not-find-me"))); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "The TIMESTAMP column 'WILL-NOT-FIND-ME', set in the WITH clause, does not exist in the schema"); + + // When: + new TestCmd("key not in schema!", statement, kafkaTopicClient); + } + private static Map minValidProps() { return ImmutableMap.of( DdlConfig.VALUE_FORMAT_PROPERTY, new StringLiteral("json"), @@ -160,6 +192,12 @@ private static Map propsWithout(final String name) { return ImmutableMap.copyOf(props); } + private void givenPropertiesWith(final Map additionalProps) { + final Map allProps = new HashMap<>(minValidProps()); + allProps.putAll(additionalProps); + when(statement.getProperties()).thenReturn(allProps); + } + private static final class TestCmd extends AbstractCreateStreamCommand { private TestCmd( diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java index f1c9ad22772c..c681def0c7a4 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java @@ -17,7 +17,6 @@ import static org.easymock.EasyMock.anyString; import static org.easymock.EasyMock.expect; -import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; @@ -106,64 +105,6 @@ public void shouldCreateCommandForCreateTable() { assertThat(result, instanceOf(CreateTableCommand.class)); } - @Test - public void shouldFailCreateTableIfKeyNameIsIncorrect() { - final HashMap tableProperties = validTableProps(); - tableProperties.put(DdlConfig.KEY_NAME_PROPERTY, new StringLiteral("COL3")); - - try { - commandFactories - .create(sqlExpression, createTable(tableProperties), NO_PROPS); - - } catch (final KsqlException e) { - assertThat(e.getMessage(), equalTo("No column with the provided key column name in the " - + "WITH clause, COL3, exists in the defined schema.")); - } - - } - - @Test - public void shouldFailCreateTableIfTimestampColumnNameIsIncorrect() { - final HashMap tableProperties = validTableProps(); - tableProperties.put(DdlConfig.TIMESTAMP_NAME_PROPERTY, new StringLiteral("COL3")); - - try { - commandFactories - .create(sqlExpression, createTable(tableProperties), NO_PROPS); - - } catch (final KsqlException e) { - assertThat(e.getMessage(), equalTo("No column with the provided timestamp column name in the WITH clause, COL3, exists in the defined schema.")); - } - } - - @Test - public void shouldFailCreateTableIfKeyIsNotProvided() { - final HashMap tableProperties = validTableProps(); - tableProperties.remove(DdlConfig.KEY_NAME_PROPERTY); - - try { - commandFactories.create(sqlExpression, createTable(properties), NO_PROPS); - - } catch (final KsqlException e) { - assertThat(e.getMessage(), equalTo("Cannot define a TABLE without providing the KEY column name in the WITH clause.")); - } - } - - @Test - public void shouldFailCreateTableIfTopicNotExist() { - final HashMap tableProperties = validTableProps(); - - givenTopicsDoNotExist(); - - try { - commandFactories.create(sqlExpression, createTable(tableProperties), - NO_PROPS); - - } catch (final KsqlException e) { - assertThat(e.getMessage(), equalTo("Kafka topic does not exist: topic")); - } - } - @Test public void shouldCreateCommandForDropStream() { final DdlCommand result = commandFactories.create(sqlExpression, diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateStreamCommandTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateStreamCommandTest.java index 1760e6f43076..4c82d262f516 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateStreamCommandTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateStreamCommandTest.java @@ -15,6 +15,8 @@ package io.confluent.ksql.ddl.commands; +import static io.confluent.ksql.metastore.model.MetaStoreMatchers.KeyFieldMatchers.hasLegacyName; +import static io.confluent.ksql.metastore.model.MetaStoreMatchers.KeyFieldMatchers.hasName; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -41,6 +43,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.WindowedSerdes; import org.junit.Before; @@ -54,8 +57,11 @@ @RunWith(MockitoJUnitRunner.class) public class CreateStreamCommandTest { + private static final String STREAM_NAME = "s1"; private static final List SOME_ELEMENTS = ImmutableList.of( - new TableElement("bob", PrimitiveType.of(SqlType.STRING))); + new TableElement("ID", PrimitiveType.of(SqlType.BIGINT)), + new TableElement("bob", PrimitiveType.of(SqlType.STRING)) + ); @Mock private KafkaTopicClient topicClient; @@ -71,7 +77,7 @@ public class CreateStreamCommandTest { @Before public void setUp() { givenPropertiesWith((Collections.emptyMap())); - when(createStreamStatement.getName()).thenReturn(QualifiedName.of("name")); + when(createStreamStatement.getName()).thenReturn(QualifiedName.of(STREAM_NAME)); when(createStreamStatement.getElements()).thenReturn(SOME_ELEMENTS); when(topicClient.isTopicExists(any())).thenReturn(true); } @@ -172,19 +178,47 @@ public void shouldThrowIfTopicDoesNotExist() { } @Test - public void testCreateAlreadyRegisteredStreamThrowsException() { + public void shouldThrowIfAlreadyRegistered() { // Given: final CreateStreamCommand cmd = createCmd(); cmd.run(metaStore); // Then: - expectedException.expectMessage("Cannot create stream 'name': A stream " + - "with name 'name' already exists"); + expectedException.expectMessage("Cannot create stream 's1': A stream " + + "with name 's1' already exists"); // When: cmd.run(metaStore); } + @Test + public void shouldAddSourceWithKeyField() { + // Given: + givenPropertiesWith(ImmutableMap.of( + "KEY", new StringLiteral("id"))); + final CreateStreamCommand cmd = createCmd(); + + // When: + cmd.run(metaStore); + + // Then: + assertThat(metaStore.getSource(STREAM_NAME).getKeyField(), hasName("ID")); + assertThat(metaStore.getSource(STREAM_NAME).getKeyField(), hasLegacyName("ID")); + } + + @Test + public void shouldAddSourceWithNoKeyField() { + // Given: + final CreateStreamCommand cmd = createCmd(); + + // When: + cmd.run(metaStore); + + // Then: + assertThat(metaStore.getSource(STREAM_NAME).getKeyField(), hasName(Optional.empty())); + assertThat(metaStore.getSource(STREAM_NAME).getKeyField(), hasLegacyName(Optional.empty())); + } + private CreateStreamCommand createCmd() { return new CreateStreamCommand("some sql", createStreamStatement, topicClient); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateTableCommandTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateTableCommandTest.java index 842372fdfa97..2b39b245189a 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateTableCommandTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateTableCommandTest.java @@ -15,6 +15,8 @@ package io.confluent.ksql.ddl.commands; +import static io.confluent.ksql.metastore.model.MetaStoreMatchers.KeyFieldMatchers.hasLegacyName; +import static io.confluent.ksql.metastore.model.MetaStoreMatchers.KeyFieldMatchers.hasName; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -40,6 +42,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.WindowedSerdes; import org.junit.Before; @@ -53,6 +56,8 @@ @RunWith(MockitoJUnitRunner.class) public class CreateTableCommandTest { + private static final String TABLE_NAME = "t1"; + @Mock private KafkaTopicClient topicClient; @Mock @@ -67,7 +72,7 @@ public class CreateTableCommandTest { @Before public void setUp() { givenPropertiesWith((Collections.emptyMap())); - when(createTableStatement.getName()).thenReturn(QualifiedName.of("name")); + when(createTableStatement.getName()).thenReturn(QualifiedName.of(TABLE_NAME)); when(createTableStatement.getElements()).thenReturn(ImmutableList.of( new TableElement("SOME-KEY", PrimitiveType.of(SqlType.STRING)) )); @@ -170,19 +175,48 @@ public void shouldThrowIfTopicDoesNotExist() { } @Test - public void testCreateAlreadyRegisteredTableThrowsException() { + public void shouldThrowIfAlreadyRegistered() { // Given: final CreateTableCommand cmd = createCmd(); cmd.run(metaStore); // Then: - expectedException.expectMessage("Cannot create table 'name': A table " + - "with name 'name' already exists"); + expectedException.expectMessage("Cannot create table 't1': A table " + + "with name 't1' already exists"); + + // When: + cmd.run(metaStore); + } + + @Test + public void shouldAddSourceWithKeyField() { + // Given: + givenPropertiesWith(ImmutableMap.of( + "KEY", new StringLiteral("some-key"))); + final CreateTableCommand cmd = createCmd(); // When: cmd.run(metaStore); + + // Then: + assertThat(metaStore.getSource(TABLE_NAME).getKeyField(), hasName("SOME-KEY")); + assertThat(metaStore.getSource(TABLE_NAME).getKeyField(), hasLegacyName("SOME-KEY")); } + @Test + public void shouldAddSourceWithNoKeyField() { + // Given: + final CreateTableCommand cmd = createCmd(); + + // When: + cmd.run(metaStore); + + // Then: + assertThat(metaStore.getSource(TABLE_NAME).getKeyField(), hasName(Optional.empty())); + assertThat(metaStore.getSource(TABLE_NAME).getKeyField(), hasLegacyName(Optional.empty())); + } + + private CreateTableCommand createCmd() { return new CreateTableCommand("some sql", createTableStatement, topicClient); } @@ -191,7 +225,6 @@ private void givenPropertiesWith(final Map props) { final Map allProps = new HashMap<>(props); allProps.putIfAbsent(DdlConfig.VALUE_FORMAT_PROPERTY, new StringLiteral("Json")); allProps.putIfAbsent(DdlConfig.KAFKA_TOPIC_NAME_PROPERTY, new StringLiteral("some-topic")); - allProps.putIfAbsent(DdlConfig.KEY_NAME_PROPERTY, new StringLiteral("some-key")); when(createTableStatement.getProperties()).thenReturn(allProps); } } \ No newline at end of file diff --git a/ksql-engine/src/test/resources/query-validation-tests/key-field.json b/ksql-engine/src/test/resources/query-validation-tests/key-field.json index 9c3d0fd5d5ca..be755a493f2b 100644 --- a/ksql-engine/src/test/resources/query-validation-tests/key-field.json +++ b/ksql-engine/src/test/resources/query-validation-tests/key-field.json @@ -796,46 +796,38 @@ }, { "name": "table | initially null | no key change | - | -", - "comments": [ - "Note: The INTERMEDIATE table is there to create a source where the key field is set to null" - ], "statements": [ - "CREATE TABLE INPUT (foo INT, bar INT) WITH (kafka_topic='input_topic', key='foo', value_format='JSON');", - "CREATE TABLE INTERMEDIATE AS SELECT bar FROM INPUT;", - "CREATE TABLE OUTPUT AS SELECT * FROM INTERMEDIATE;" + "CREATE TABLE INPUT (bar INT) WITH (kafka_topic='input_topic', value_format='JSON');", + "CREATE TABLE OUTPUT AS SELECT * FROM INPUT;" ], "inputs": [ - {"topic": "INTERMEDIATE", "key": "x", "value": {"bar": 1}} + {"topic": "input_topic", "key": "x", "value": {"bar": 1}} ], "outputs": [ {"topic": "OUTPUT", "key": "x", "value": {"BAR": 1}} ], "post": { "sources": [ - {"name": "INTERMEDIATE", "type": "table", "keyField": {"name": null, "legacyName": null}}, + {"name": "INPUT", "type": "table", "keyField": {"name": null, "legacyName": null}}, {"name": "OUTPUT", "type": "table", "keyField": {"name": null, "legacyName": null}} ] } }, { "name": "table | initially null | group by (-) | key in value | no aliasing", - "comments": [ - "Note: The INTERMEDIATE table is there to create a source where the key field is null" - ], "statements": [ - "CREATE TABLE INPUT (foo INT, bar INT) WITH (kafka_topic='input_topic', key='bar', value_format='JSON');", - "CREATE TABLE INTERMEDIATE AS SELECT foo FROM INPUT;", - "CREATE TABLE OUTPUT AS SELECT foo, COUNT(*) FROM INTERMEDIATE GROUP BY foo;" + "CREATE TABLE INPUT (foo INT, bar INT) WITH (kafka_topic='input_topic', value_format='JSON');", + "CREATE TABLE OUTPUT AS SELECT foo, COUNT(*) FROM INPUT GROUP BY foo;" ], "inputs": [ - {"topic": "INTERMEDIATE", "key": "x", "value": {"foo": 1, "bar": 2}} + {"topic": "input_topic", "key": "x", "value": {"foo": 1, "bar": 2}} ], "outputs": [ {"topic": "OUTPUT", "key": "1", "value": {"FOO": 1, "KSQL_COL_1": 1}} ], "post": { "sources": [ - {"name": "INTERMEDIATE", "type": "table", "keyField": {"name": null, "legacyName": null}}, + {"name": "INPUT", "type": "table", "keyField": {"name": null, "legacyName": null}}, {"name": "OUTPUT", "type": "table", "keyField": {"name": "FOO", "legacyName": "KSQL_INTERNAL_COL_0", "legacySchema": "STRING"}} ] } @@ -847,13 +839,11 @@ "and test the key fields of the entities added to the metastore, to ensure backwards compatibility is maintained", "In this case, the code previously incorrectly set the keyField to 'KSQL_INTERNAL_COL_0' rather than the correct 'FOO'", "This would result in an unnecessary repartition step being added to the DOWNSTREAM query.", - "New versions of the code must not remove this unnecessary step for existing queries, as that would break backwards compatibility", - "Note: The INTERMEDIATE table is there to create a source where the key field is set to null" + "New versions of the code must not remove this unnecessary step for existing queries, as that would break backwards compatibility" ], "statements": [ - "CREATE TABLE INPUT (foo INT, bar INT) WITH (kafka_topic='input_topic', key='bar', value_format='JSON');", - "CREATE TABLE INTERMEDIATE AS SELECT foo FROM INPUT;", - "CREATE TABLE OUTPUT AS SELECT foo, COUNT(*) FROM INTERMEDIATE GROUP BY foo;", + "CREATE TABLE INPUT (foo INT, bar INT) WITH (kafka_topic='input_topic', value_format='JSON');", + "CREATE TABLE OUTPUT AS SELECT foo, COUNT(*) FROM INPUT GROUP BY foo;", "CREATE TABLE DOWNSTREAM AS SELECT foo, COUNT(*) FROM OUTPUT GROUP BY foo;" ], "properties": { @@ -875,23 +865,21 @@ { "name": "table | initially null | group by (-) | key in value | aliasing", "comments": [ - "Note: The INTERMEDIATE table is there to create a source where the key field is set to null", - "note: GROUP BY takes the name of the field from the source schema." + "Note: GROUP BY takes the name of the field from the source schema." ], "statements": [ - "CREATE TABLE INPUT (foo INT, bar INT) WITH (kafka_topic='input_topic', key='bar', value_format='JSON');", - "CREATE TABLE INTERMEDIATE AS SELECT foo FROM INPUT;", - "CREATE TABLE OUTPUT AS SELECT foo AS aliased, COUNT(*) FROM INTERMEDIATE GROUP BY foo;" + "CREATE TABLE INPUT (foo INT, bar INT) WITH (kafka_topic='input_topic', value_format='JSON');", + "CREATE TABLE OUTPUT AS SELECT foo AS aliased, COUNT(*) FROM INPUT GROUP BY foo;" ], "inputs": [ - {"topic": "INTERMEDIATE", "key": "x", "value": {"foo": 1, "bar": 2}} + {"topic": "input_topic", "key": "x", "value": {"foo": 1, "bar": 2}} ], "outputs": [ {"topic": "OUTPUT", "key": "1", "value": {"ALIASED": 1, "KSQL_COL_1": 1}} ], "post": { "sources": [ - {"name": "INTERMEDIATE", "type": "table", "keyField": {"name": null, "legacyName": null}}, + {"name": "INPUT", "type": "table", "keyField": {"name": null, "legacyName": null}}, {"name": "OUTPUT", "type": "table", "keyField": {"name": "ALIASED", "legacyName": "KSQL_INTERNAL_COL_0", "legacySchema": "STRING"}} ] } @@ -903,13 +891,11 @@ "and test the key fields of the entities added to the metastore, to ensure backwards compatibility is maintained", "In this case, the code previously incorrectly set the keyField to 'KSQL_INTERNAL_COL_0' rather than the correct 'ALIASED'", "This would result in an unnecessary repartition step being added to the DOWNSTREAM query.", - "New versions of the code must not remove this unnecessary step for existing queries, as that would break backwards compatibility", - "Note: The INTERMEDIATE table is there to create a source where the key field is set to null" + "New versions of the code must not remove this unnecessary step for existing queries, as that would break backwards compatibility" ], "statements": [ - "CREATE TABLE INPUT (foo INT, bar INT) WITH (kafka_topic='input_topic', key='bar', value_format='JSON');", - "CREATE TABLE INTERMEDIATE AS SELECT foo FROM INPUT;", - "CREATE TABLE OUTPUT AS SELECT foo AS aliased, COUNT(*) FROM INTERMEDIATE GROUP BY foo;", + "CREATE TABLE INPUT (foo INT, bar INT) WITH (kafka_topic='input_topic', value_format='JSON');", + "CREATE TABLE OUTPUT AS SELECT foo AS aliased, COUNT(*) FROM INPUT GROUP BY foo;", "CREATE TABLE DOWNSTREAM AS SELECT aliased, COUNT(*) FROM OUTPUT GROUP BY aliased;" ], "properties": { diff --git a/ksql-metastore/src/test/java/io/confluent/ksql/metastore/model/MetaStoreMatchers.java b/ksql-metastore/src/test/java/io/confluent/ksql/metastore/model/MetaStoreMatchers.java index 081df6dfbb9c..a70e7def0d72 100644 --- a/ksql-metastore/src/test/java/io/confluent/ksql/metastore/model/MetaStoreMatchers.java +++ b/ksql-metastore/src/test/java/io/confluent/ksql/metastore/model/MetaStoreMatchers.java @@ -69,6 +69,10 @@ public static final class KeyFieldMatchers { private KeyFieldMatchers() { } + public static Matcher hasName(final String name) { + return hasName(Optional.of(name)); + } + public static Matcher hasName(final Optional name) { return new FeatureMatcher> (is(name), "field with name", "name") { @@ -79,6 +83,10 @@ protected Optional featureValueOf(final KeyField actual) { }; } + public static Matcher hasLegacyName(final String name) { + return hasLegacyName(Optional.of(name)); + } + public static Matcher hasLegacyName(final Optional name) { return new FeatureMatcher> (is(name), "field with legacy name", "legacy name") {