From 237d3519fea25a26d8c0e9ca3179445a8cc37f87 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Thu, 19 Nov 2020 12:44:24 -0800 Subject: [PATCH 1/4] fix: propagate null-valued records in repartition --- .../ksql/schema/ksql/LogicalSchema.java | 18 +++++ .../ksql/schema/ksql/LogicalSchemaTest.java | 21 ++++++ .../io/confluent/ksql/analyzer/Analyzer.java | 1 + .../ksql/planner/RequiredColumns.java | 2 +- .../ksql/execution/util}/ColumnExtractor.java | 2 +- .../ksql/execution/util/StructKeyUtil.java | 2 +- .../query-validation-tests/partition-by.json | 45 ++++++++++++ .../streams/PartitionByParamsFactory.java | 48 ++++++++++-- .../streams/PartitionByParamsFactoryTest.java | 73 +++++++++++++++++++ 9 files changed, 201 insertions(+), 11 deletions(-) rename {ksqldb-engine/src/main/java/io/confluent/ksql/analyzer => ksqldb-execution/src/main/java/io/confluent/ksql/execution/util}/ColumnExtractor.java (97%) diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java index 79232235e27e..69dac659a6e3 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java @@ -138,6 +138,24 @@ public LogicalSchema withoutPseudoAndKeyColsInValue() { return rebuild(false, false); } + /** + * Remove all non-key columns from the value, and copy all key columns into the value. + * + * @return the new schema + */ + public LogicalSchema withKeyColsOnly() { + final List key = byNamespace().get(Namespace.KEY); + + final ImmutableList.Builder builder = ImmutableList.builder(); + builder.addAll(key); + int valueIndex = 0; + for (final Column c : key) { + builder.add(Column.of(c.name(), c.type(), VALUE, valueIndex++)); + } + + return new LogicalSchema(builder.build()); + } + /** * @param columnName the column name to check * @return {@code true} if the column matches the name of any key column. diff --git a/ksqldb-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java b/ksqldb-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java index 81ff3f7b58cd..5b1f8884b501 100644 --- a/ksqldb-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java +++ b/ksqldb-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java @@ -541,6 +541,27 @@ public void shouldRemoveKeyColumnsWhereEverTheyAre() { )); } + @Test + public void shouldRemoveAllButKeyCols() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .keyColumn(K0, INTEGER) + .valueColumn(F0, BIGINT) + .valueColumn(F1, BIGINT) + .build() + .withPseudoAndKeyColsInValue(false); + + // When + final LogicalSchema result = schema.withKeyColsOnly(); + + // Then: + assertThat(result, is(LogicalSchema.builder() + .keyColumn(K0, INTEGER) + .valueColumn(K0, INTEGER) + .build() + )); + } + @Test public void shouldMatchMetaColumnName() { assertThat(SystemColumns.isPseudoColumn(ROWTIME_NAME), is(true)); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksqldb-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java index f59fabe599cf..32bafbeb29b6 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java @@ -32,6 +32,7 @@ import io.confluent.ksql.execution.expression.tree.LogicalBinaryExpression; import io.confluent.ksql.execution.expression.tree.NullLiteral; import io.confluent.ksql.execution.expression.tree.TraversalExpressionVisitor; +import io.confluent.ksql.execution.util.ColumnExtractor; import io.confluent.ksql.execution.windows.KsqlWindowExpression; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.model.DataSource; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/RequiredColumns.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/RequiredColumns.java index 847811f9ba38..990b4e60d9c6 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/RequiredColumns.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/RequiredColumns.java @@ -19,9 +19,9 @@ import com.google.common.collect.ImmutableSet; import com.google.errorprone.annotations.Immutable; -import io.confluent.ksql.analyzer.ColumnExtractor; import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.util.ColumnExtractor; import java.util.Collection; import java.util.HashSet; import java.util.Objects; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/analyzer/ColumnExtractor.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ColumnExtractor.java similarity index 97% rename from ksqldb-engine/src/main/java/io/confluent/ksql/analyzer/ColumnExtractor.java rename to ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ColumnExtractor.java index 64544c69e318..fe764897c5c1 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/analyzer/ColumnExtractor.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ColumnExtractor.java @@ -13,7 +13,7 @@ * specific language governing permissions and limitations under the License. */ -package io.confluent.ksql.analyzer; +package io.confluent.ksql.execution.util; import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.Expression; diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/StructKeyUtil.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/StructKeyUtil.java index 10dc8e2b0774..aeeb0500e472 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/StructKeyUtil.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/StructKeyUtil.java @@ -58,7 +58,7 @@ public static KeyBuilder keyBuilder(final ColumnName name, final SqlType type) { } @SuppressWarnings("unchecked") - public static List asList(final Object key) { + public static List asList(final Object key) { final Optional> windowed = key instanceof Windowed ? Optional.of((Windowed) key) : Optional.empty(); diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json index 867b50393045..fe410ab4be24 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json @@ -533,6 +533,51 @@ {"topic": "REPARTITIONED", "key": "zero", "value": "0,50,0"} ] }, + { + "name": "only key column - with null value", + "statements": [ + "CREATE STREAM TEST (K BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) with (kafka_topic='test_topic', value_format = 'delimited');", + "CREATE STREAM REPARTITIONED AS select * from TEST partition by K;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": null}, + {"topic": "test_topic", "key": 0, "value": "0,zero,50"} + ], + "outputs": [ + {"topic": "REPARTITIONED", "key": 0, "value": null}, + {"topic": "REPARTITIONED", "key": 0, "value": "0,zero,50"} + ] + }, + { + "name": "key expression - with null value", + "statements": [ + "CREATE STREAM TEST (K BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) with (kafka_topic='test_topic', value_format = 'delimited');", + "CREATE STREAM REPARTITIONED AS select K + 2, ID, NAME, VALUE from TEST partition by K + 2;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": null}, + {"topic": "test_topic", "key": 0, "value": "0,zero,50"} + ], + "outputs": [ + {"topic": "REPARTITIONED", "key": 2, "value": null}, + {"topic": "REPARTITIONED", "key": 2, "value": "0,zero,50"} + ] + }, + { + "name": "udf key expression - with null value", + "statements": [ + "CREATE STREAM TEST (K BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) with (kafka_topic='test_topic', value_format = 'delimited');", + "CREATE STREAM REPARTITIONED AS select ABS(K), ID, NAME, VALUE from TEST partition by ABS(K);" + ], + "inputs": [ + {"topic": "test_topic", "key": -1, "value": null}, + {"topic": "test_topic", "key": -1, "value": "0,zero,50"} + ], + "outputs": [ + {"topic": "REPARTITIONED", "key": 1, "value": null}, + {"topic": "REPARTITIONED", "key": 1, "value": "0,zero,50"} + ] + }, { "name": "partition by with null partition by value", "statements": [ diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java index c2573c5f4434..8375aa53873e 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java @@ -21,6 +21,7 @@ import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.execution.expression.tree.NullLiteral; +import io.confluent.ksql.execution.util.ColumnExtractor; import io.confluent.ksql.execution.util.ExpressionTypeManager; import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.execution.util.StructKeyUtil.KeyBuilder; @@ -33,7 +34,9 @@ import io.confluent.ksql.schema.ksql.LogicalSchema.Builder; import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.util.KsqlConfig; +import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.function.BiFunction; import java.util.function.Function; import org.apache.kafka.connect.data.Struct; @@ -87,12 +90,19 @@ public static PartitionByParams build( // is already present in the current value mapper = (k, v) -> new KeyValue<>(null, v); } else { - final Function evaluator = buildExpressionEvaluator( + final Set partitionByCols = + ColumnExtractor.extractColumns(partitionBy); + final boolean isKeyExpression = partitionByCols.stream() + .map(ColumnReferenceExp::getColumnName) + .allMatch(sourceSchema::isKeyColumn); + + final PartitionByExpressionEvaluator evaluator = buildExpressionEvaluator( sourceSchema, partitionBy, ksqlConfig, functionRegistry, - logger + logger, + isKeyExpression ); mapper = buildMapper(resultSchema, partitionByCol, evaluator); } @@ -100,6 +110,20 @@ public static PartitionByParams build( return new PartitionByParams(resultSchema, mapper); } + private static class PartitionByExpressionEvaluator { + + private final Function evaluator; + private final boolean acceptsKey; + + PartitionByExpressionEvaluator( + final Function evaluator, + final boolean acceptsKey + ) { + this.evaluator = Objects.requireNonNull(evaluator, "evaluator"); + this.acceptsKey = acceptsKey; + } + } + public static LogicalSchema buildSchema( final LogicalSchema sourceSchema, final Expression partitionBy, @@ -162,7 +186,7 @@ private static Optional getPartitionByColumnName( private static BiFunction> buildMapper( final LogicalSchema resultSchema, final Optional partitionByCol, - final Function evaluator + final PartitionByExpressionEvaluator evaluator ) { // If partitioning by something other than an existing column, then a new key will have // been synthesized. This new key must be appended to the value to make it available for @@ -172,7 +196,11 @@ private static BiFunction> buil final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder(resultSchema); return (k, v) -> { - final Object newKey = evaluator.apply(v); + final Object newKey = evaluator.evaluator.apply( + evaluator.acceptsKey + ? GenericRow.fromList(StructKeyUtil.asList(k)) + : v + ); final Struct structKey = keyBuilder.build(newKey, 0); if (v != null && appendNewKey) { @@ -183,15 +211,16 @@ private static BiFunction> buil }; } - private static Function buildExpressionEvaluator( + private static PartitionByExpressionEvaluator buildExpressionEvaluator( final LogicalSchema schema, final Expression partitionBy, final KsqlConfig ksqlConfig, final FunctionRegistry functionRegistry, - final ProcessingLogger logger + final ProcessingLogger logger, + final boolean isKeyExpression ) { final CodeGenRunner codeGen = new CodeGenRunner( - schema, + isKeyExpression ? schema.withKeyColsOnly() : schema, ksqlConfig, functionRegistry ); @@ -202,6 +231,9 @@ private static Function buildExpressionEvaluator( final String errorMsg = "Error computing new key from expression " + expressionMetadata.getExpression(); - return row -> expressionMetadata.evaluate(row, null, logger, () -> errorMsg); + return new PartitionByExpressionEvaluator( + row -> expressionMetadata.evaluate(row, null, logger, () -> errorMsg), + isKeyExpression + ); } } diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/PartitionByParamsFactoryTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/PartitionByParamsFactoryTest.java index 13fca920c6ef..dca64469cef2 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/PartitionByParamsFactoryTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/PartitionByParamsFactoryTest.java @@ -17,6 +17,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -24,12 +25,14 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.expression.tree.ArithmeticBinaryExpression; import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression; import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression.Sign; import io.confluent.ksql.execution.expression.tree.DereferenceExpression; import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.execution.expression.tree.FunctionCall; import io.confluent.ksql.execution.expression.tree.NullLiteral; +import io.confluent.ksql.execution.expression.tree.StringLiteral; import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp; import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.execution.util.StructKeyUtil.KeyBuilder; @@ -41,6 +44,7 @@ import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.FunctionName; +import io.confluent.ksql.schema.Operator; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.SystemColumns; import io.confluent.ksql.schema.ksql.types.SqlStruct; @@ -275,6 +279,41 @@ public void shouldSetNewKey() { assertThat(result.key, is(keyBuilder.build(COL1_VALUE, 0))); } + @Test + public void shouldPropagateNullValueWhenPartitioningByKey() { + // Given: + final BiFunction> mapper = + partitionBy(new UnqualifiedColumnReferenceExp(COL0)).getMapper(); + + // When: + final KeyValue result = mapper.apply(key, null); + + // Then: + final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder(COL0, SqlTypes.STRING); + assertThat(result.key, is(keyBuilder.build(OLD_KEY, 0))); + assertThat(result.value, is(nullValue())); + } + + @Test + public void shouldPropagateNullValueWhenPartitioningByKeyExpression() { + // Given: + final BiFunction> mapper = + partitionBy(new ArithmeticBinaryExpression( + Operator.ADD, + new UnqualifiedColumnReferenceExp(COL0), + new StringLiteral("-foo")) + ).getMapper(); + + // When: + final KeyValue result = mapper.apply(key, null); + + // Then: + final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder( + ColumnName.of("KSQL_COL_0"), SqlTypes.STRING); + assertThat(result.key, is(keyBuilder.build(OLD_KEY + "-foo", 0))); + assertThat(result.value, is(nullValue())); + } + @Test public void shouldNotChangeValueIfPartitioningByColumnReference() { // Given: @@ -290,6 +329,21 @@ public void shouldNotChangeValueIfPartitioningByColumnReference() { assertThat(result.value, is(GenericRow.fromList(originals))); } + @Test + public void shouldNotChangeValueIfPartitioningByKeyColumnReference() { + // Given: + final BiFunction> mapper = + partitionBy(new UnqualifiedColumnReferenceExp(COL0)).getMapper(); + + final ImmutableList originals = ImmutableList.copyOf(value.values()); + + // When: + final KeyValue result = mapper.apply(key, value); + + // Then: + assertThat(result.value, is(GenericRow.fromList(originals))); + } + @Test public void shouldAppendNewKeyColumnToValueIfNotPartitioningByColumnReference() { // Given: @@ -308,6 +362,25 @@ public void shouldAppendNewKeyColumnToValueIfNotPartitioningByColumnReference() assertThat(result.value, is(GenericRow.fromList(originals).append(ConstantUdf.VALUE))); } + @Test + public void shouldAppendNewKeyColumnToValueIfPartitioningByKeyExpression() { + // Given: + final BiFunction> mapper = + partitionBy(new ArithmeticBinaryExpression( + Operator.ADD, + new UnqualifiedColumnReferenceExp(COL0), + new StringLiteral("-foo")) + ).getMapper(); + + final ImmutableList originals = ImmutableList.copyOf(value.values()); + + // When: + final KeyValue result = mapper.apply(key, value); + + // Then: + assertThat(result.value, is(GenericRow.fromList(originals).append(OLD_KEY + "-foo"))); + } + @Test public void shouldNotChangeValueIfPartitioningByNull() { // Given: From a9c0de09da22cd6552c140b3b7b5056893a9c3dc Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Thu, 19 Nov 2020 14:42:01 -0800 Subject: [PATCH 2/4] chore: historic plans --- .../6.2.0_1605825669504/plan.json | 150 ++++++++++++++++++ .../6.2.0_1605825669504/spec.json | 100 ++++++++++++ .../6.2.0_1605825669504/topology | 16 ++ .../6.2.0_1605825669435/plan.json | 143 +++++++++++++++++ .../6.2.0_1605825669435/spec.json | 100 ++++++++++++ .../6.2.0_1605825669435/topology | 13 ++ .../6.2.0_1605825669538/plan.json | 150 ++++++++++++++++++ .../6.2.0_1605825669538/spec.json | 100 ++++++++++++ .../6.2.0_1605825669538/topology | 16 ++ 9 files changed, 788 insertions(+) create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_key_expression_-_with_null_value/6.2.0_1605825669504/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_key_expression_-_with_null_value/6.2.0_1605825669504/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_key_expression_-_with_null_value/6.2.0_1605825669504/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_only_key_column_-_with_null_value/6.2.0_1605825669435/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_only_key_column_-_with_null_value/6.2.0_1605825669435/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_only_key_column_-_with_null_value/6.2.0_1605825669435/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_udf_key_expression_-_with_null_value/6.2.0_1605825669538/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_udf_key_expression_-_with_null_value/6.2.0_1605825669538/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_udf_key_expression_-_with_null_value/6.2.0_1605825669538/topology diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_key_expression_-_with_null_value/6.2.0_1605825669504/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_key_expression_-_with_null_value/6.2.0_1605825669504/plan.json new file mode 100644 index 000000000000..021791637bf8 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_key_expression_-_with_null_value/6.2.0_1605825669504/plan.json @@ -0,0 +1,150 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K BIGINT KEY, ID BIGINT, NAME STRING, VALUE BIGINT) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`K` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM REPARTITIONED AS SELECT\n (TEST.K + 2) KSQL_COL_0,\n TEST.ID ID,\n TEST.NAME NAME,\n TEST.VALUE VALUE\nFROM TEST TEST\nPARTITION BY (TEST.K + 2)\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "REPARTITIONED", + "schema" : "`KSQL_COL_0` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT", + "topicName" : "REPARTITIONED", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "REPARTITIONED", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "REPARTITIONED" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSelectKeyV2", + "properties" : { + "queryContext" : "PartitionBy" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`K` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT" + }, + "keyExpression" : "(K + 2)" + }, + "keyColumnNames" : [ "KSQL_COL_0" ], + "selectExpressions" : [ "ID AS ID", "NAME AS NAME", "VALUE AS VALUE" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "REPARTITIONED" + }, + "queryId" : "CSAS_REPARTITIONED_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.key.format.enabled" : "false", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_key_expression_-_with_null_value/6.2.0_1605825669504/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_key_expression_-_with_null_value/6.2.0_1605825669504/spec.json new file mode 100644 index 000000000000..f41ea4bd2ef4 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_key_expression_-_with_null_value/6.2.0_1605825669504/spec.json @@ -0,0 +1,100 @@ +{ + "version" : "6.2.0", + "timestamp" : 1605825669504, + "path" : "query-validation-tests/partition-by.json", + "schemas" : { + "CSAS_REPARTITIONED_0.KsqlTopic.Source" : { + "schema" : "`K` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "CSAS_REPARTITIONED_0.REPARTITIONED" : { + "schema" : "`KSQL_COL_0` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + } + }, + "testCase" : { + "name" : "key expression - with null value", + "inputs" : [ { + "topic" : "test_topic", + "key" : 0, + "value" : null + }, { + "topic" : "test_topic", + "key" : 0, + "value" : "0,zero,50" + } ], + "outputs" : [ { + "topic" : "REPARTITIONED", + "key" : 2, + "value" : null + }, { + "topic" : "REPARTITIONED", + "key" : 2, + "value" : "0,zero,50" + } ], + "topics" : [ { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "REPARTITIONED", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (K BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) with (kafka_topic='test_topic', value_format = 'delimited');", "CREATE STREAM REPARTITIONED AS select K + 2, ID, NAME, VALUE from TEST partition by K + 2;" ], + "post" : { + "sources" : [ { + "name" : "REPARTITIONED", + "type" : "STREAM", + "schema" : "`KSQL_COL_0` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`K` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "REPARTITIONED", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_key_expression_-_with_null_value/6.2.0_1605825669504/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_key_expression_-_with_null_value/6.2.0_1605825669504/topology new file mode 100644 index 000000000000..ec9879fdad67 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_key_expression_-_with_null_value/6.2.0_1605825669504/topology @@ -0,0 +1,16 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> PartitionBy-SelectKey + <-- KSTREAM-SOURCE-0000000000 + Processor: PartitionBy-SelectKey (stores: []) + --> Project + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000004 + <-- PartitionBy-SelectKey + Sink: KSTREAM-SINK-0000000004 (topic: REPARTITIONED) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_only_key_column_-_with_null_value/6.2.0_1605825669435/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_only_key_column_-_with_null_value/6.2.0_1605825669435/plan.json new file mode 100644 index 000000000000..6208c5929b87 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_only_key_column_-_with_null_value/6.2.0_1605825669435/plan.json @@ -0,0 +1,143 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K BIGINT KEY, ID BIGINT, NAME STRING, VALUE BIGINT) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`K` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM REPARTITIONED AS SELECT *\nFROM TEST TEST\nPARTITION BY TEST.K\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "REPARTITIONED", + "schema" : "`K` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT", + "topicName" : "REPARTITIONED", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "REPARTITIONED", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "REPARTITIONED" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`K` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "ID AS ID", "NAME AS NAME", "VALUE AS VALUE" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "REPARTITIONED" + }, + "queryId" : "CSAS_REPARTITIONED_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.key.format.enabled" : "false", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_only_key_column_-_with_null_value/6.2.0_1605825669435/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_only_key_column_-_with_null_value/6.2.0_1605825669435/spec.json new file mode 100644 index 000000000000..a7d74179b24d --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_only_key_column_-_with_null_value/6.2.0_1605825669435/spec.json @@ -0,0 +1,100 @@ +{ + "version" : "6.2.0", + "timestamp" : 1605825669435, + "path" : "query-validation-tests/partition-by.json", + "schemas" : { + "CSAS_REPARTITIONED_0.KsqlTopic.Source" : { + "schema" : "`K` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "CSAS_REPARTITIONED_0.REPARTITIONED" : { + "schema" : "`K` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + } + }, + "testCase" : { + "name" : "only key column - with null value", + "inputs" : [ { + "topic" : "test_topic", + "key" : 0, + "value" : null + }, { + "topic" : "test_topic", + "key" : 0, + "value" : "0,zero,50" + } ], + "outputs" : [ { + "topic" : "REPARTITIONED", + "key" : 0, + "value" : null + }, { + "topic" : "REPARTITIONED", + "key" : 0, + "value" : "0,zero,50" + } ], + "topics" : [ { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "REPARTITIONED", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (K BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) with (kafka_topic='test_topic', value_format = 'delimited');", "CREATE STREAM REPARTITIONED AS select * from TEST partition by K;" ], + "post" : { + "sources" : [ { + "name" : "REPARTITIONED", + "type" : "STREAM", + "schema" : "`K` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`K` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "REPARTITIONED", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_only_key_column_-_with_null_value/6.2.0_1605825669435/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_only_key_column_-_with_null_value/6.2.0_1605825669435/topology new file mode 100644 index 000000000000..ab7f6f672a54 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_only_key_column_-_with_null_value/6.2.0_1605825669435/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: REPARTITIONED) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_udf_key_expression_-_with_null_value/6.2.0_1605825669538/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_udf_key_expression_-_with_null_value/6.2.0_1605825669538/plan.json new file mode 100644 index 000000000000..7e9e1b2f3262 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_udf_key_expression_-_with_null_value/6.2.0_1605825669538/plan.json @@ -0,0 +1,150 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K BIGINT KEY, ID BIGINT, NAME STRING, VALUE BIGINT) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`K` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM REPARTITIONED AS SELECT\n ABS(TEST.K) KSQL_COL_0,\n TEST.ID ID,\n TEST.NAME NAME,\n TEST.VALUE VALUE\nFROM TEST TEST\nPARTITION BY ABS(TEST.K)\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "REPARTITIONED", + "schema" : "`KSQL_COL_0` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT", + "topicName" : "REPARTITIONED", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "REPARTITIONED", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "REPARTITIONED" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSelectKeyV2", + "properties" : { + "queryContext" : "PartitionBy" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`K` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT" + }, + "keyExpression" : "ABS(K)" + }, + "keyColumnNames" : [ "KSQL_COL_0" ], + "selectExpressions" : [ "ID AS ID", "NAME AS NAME", "VALUE AS VALUE" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "REPARTITIONED" + }, + "queryId" : "CSAS_REPARTITIONED_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.key.format.enabled" : "false", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_udf_key_expression_-_with_null_value/6.2.0_1605825669538/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_udf_key_expression_-_with_null_value/6.2.0_1605825669538/spec.json new file mode 100644 index 000000000000..bae9ea4c31e5 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_udf_key_expression_-_with_null_value/6.2.0_1605825669538/spec.json @@ -0,0 +1,100 @@ +{ + "version" : "6.2.0", + "timestamp" : 1605825669538, + "path" : "query-validation-tests/partition-by.json", + "schemas" : { + "CSAS_REPARTITIONED_0.KsqlTopic.Source" : { + "schema" : "`K` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "CSAS_REPARTITIONED_0.REPARTITIONED" : { + "schema" : "`KSQL_COL_0` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + } + }, + "testCase" : { + "name" : "udf key expression - with null value", + "inputs" : [ { + "topic" : "test_topic", + "key" : -1, + "value" : null + }, { + "topic" : "test_topic", + "key" : -1, + "value" : "0,zero,50" + } ], + "outputs" : [ { + "topic" : "REPARTITIONED", + "key" : 1, + "value" : null + }, { + "topic" : "REPARTITIONED", + "key" : 1, + "value" : "0,zero,50" + } ], + "topics" : [ { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "REPARTITIONED", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (K BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) with (kafka_topic='test_topic', value_format = 'delimited');", "CREATE STREAM REPARTITIONED AS select ABS(K), ID, NAME, VALUE from TEST partition by ABS(K);" ], + "post" : { + "sources" : [ { + "name" : "REPARTITIONED", + "type" : "STREAM", + "schema" : "`KSQL_COL_0` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`K` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "REPARTITIONED", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_udf_key_expression_-_with_null_value/6.2.0_1605825669538/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_udf_key_expression_-_with_null_value/6.2.0_1605825669538/topology new file mode 100644 index 000000000000..ec9879fdad67 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_udf_key_expression_-_with_null_value/6.2.0_1605825669538/topology @@ -0,0 +1,16 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> PartitionBy-SelectKey + <-- KSTREAM-SOURCE-0000000000 + Processor: PartitionBy-SelectKey (stores: []) + --> Project + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000004 + <-- PartitionBy-SelectKey + Sink: KSTREAM-SINK-0000000004 (topic: REPARTITIONED) + <-- Project + From ad0f0311b3e3d5844f570179486bfacda4badc76 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Thu, 19 Nov 2020 14:48:09 -0800 Subject: [PATCH 3/4] chore: style --- .../streams/PartitionByParamsFactory.java | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java index 8375aa53873e..64a6fedfa470 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java @@ -110,20 +110,6 @@ public static PartitionByParams build( return new PartitionByParams(resultSchema, mapper); } - private static class PartitionByExpressionEvaluator { - - private final Function evaluator; - private final boolean acceptsKey; - - PartitionByExpressionEvaluator( - final Function evaluator, - final boolean acceptsKey - ) { - this.evaluator = Objects.requireNonNull(evaluator, "evaluator"); - this.acceptsKey = acceptsKey; - } - } - public static LogicalSchema buildSchema( final LogicalSchema sourceSchema, final Expression partitionBy, @@ -236,4 +222,18 @@ private static PartitionByExpressionEvaluator buildExpressionEvaluator( isKeyExpression ); } + + private static class PartitionByExpressionEvaluator { + + private final Function evaluator; + private final boolean acceptsKey; + + PartitionByExpressionEvaluator( + final Function evaluator, + final boolean acceptsKey + ) { + this.evaluator = Objects.requireNonNull(evaluator, "evaluator"); + this.acceptsKey = acceptsKey; + } + } } From bd0e3389e1202ba80474671424c9ce80c93b267d Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Fri, 20 Nov 2020 10:12:13 -0800 Subject: [PATCH 4/4] chore: feedback --- .../streams/PartitionByParamsFactory.java | 47 ++++++++++++------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java index 64a6fedfa470..496cffc11f11 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java @@ -38,7 +38,7 @@ import java.util.Optional; import java.util.Set; import java.util.function.BiFunction; -import java.util.function.Function; +import java.util.function.Supplier; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.KeyValue; @@ -92,7 +92,7 @@ public static PartitionByParams build( } else { final Set partitionByCols = ColumnExtractor.extractColumns(partitionBy); - final boolean isKeyExpression = partitionByCols.stream() + final boolean partitionByInvolvesKeyColsOnly = partitionByCols.stream() .map(ColumnReferenceExp::getColumnName) .allMatch(sourceSchema::isKeyColumn); @@ -102,7 +102,7 @@ public static PartitionByParams build( ksqlConfig, functionRegistry, logger, - isKeyExpression + partitionByInvolvesKeyColsOnly ); mapper = buildMapper(resultSchema, partitionByCol, evaluator); } @@ -182,11 +182,7 @@ private static BiFunction> buil final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder(resultSchema); return (k, v) -> { - final Object newKey = evaluator.evaluator.apply( - evaluator.acceptsKey - ? GenericRow.fromList(StructKeyUtil.asList(k)) - : v - ); + final Object newKey = evaluator.evaluate(k, v); final Struct structKey = keyBuilder.build(newKey, 0); if (v != null && appendNewKey) { @@ -203,10 +199,10 @@ private static PartitionByExpressionEvaluator buildExpressionEvaluator( final KsqlConfig ksqlConfig, final FunctionRegistry functionRegistry, final ProcessingLogger logger, - final boolean isKeyExpression + final boolean partitionByInvolvesKeyColsOnly ) { final CodeGenRunner codeGen = new CodeGenRunner( - isKeyExpression ? schema.withKeyColsOnly() : schema, + partitionByInvolvesKeyColsOnly ? schema.withKeyColsOnly() : schema, ksqlConfig, functionRegistry ); @@ -218,22 +214,37 @@ private static PartitionByExpressionEvaluator buildExpressionEvaluator( + expressionMetadata.getExpression(); return new PartitionByExpressionEvaluator( - row -> expressionMetadata.evaluate(row, null, logger, () -> errorMsg), - isKeyExpression + expressionMetadata, + logger, + () -> errorMsg, + partitionByInvolvesKeyColsOnly ); } private static class PartitionByExpressionEvaluator { - private final Function evaluator; - private final boolean acceptsKey; + private final ExpressionMetadata expressionMetadata; + private final ProcessingLogger logger; + private final Supplier errorMsg; + private final boolean evaluateOnKeyOnly; PartitionByExpressionEvaluator( - final Function evaluator, - final boolean acceptsKey + final ExpressionMetadata expressionMetadata, + final ProcessingLogger logger, + final Supplier errorMsg, + final boolean evaluateOnKeyOnly ) { - this.evaluator = Objects.requireNonNull(evaluator, "evaluator"); - this.acceptsKey = acceptsKey; + this.expressionMetadata = Objects.requireNonNull(expressionMetadata, "expressionMetadata"); + this.logger = Objects.requireNonNull(logger, "logger"); + this.errorMsg = Objects.requireNonNull(errorMsg, "errorMsg"); + this.evaluateOnKeyOnly = evaluateOnKeyOnly; + } + + Object evaluate(final Object key, final GenericRow value) { + final GenericRow row = evaluateOnKeyOnly + ? GenericRow.fromList(StructKeyUtil.asList(key)) + : value; + return expressionMetadata.evaluate(row, null, logger, errorMsg); } } }