Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: propagate null-valued records in repartition #6647

Merged
merged 4 commits into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,24 @@ public LogicalSchema withoutPseudoAndKeyColsInValue() {
return rebuild(false, false);
}

/**
* Remove all non-key columns from the value, and copy all key columns into the value.
*
* @return the new schema
*/
public LogicalSchema withKeyColsOnly() {
final List<Column> key = byNamespace().get(Namespace.KEY);

final ImmutableList.Builder<Column> builder = ImmutableList.builder();
builder.addAll(key);
int valueIndex = 0;
for (final Column c : key) {
builder.add(Column.of(c.name(), c.type(), VALUE, valueIndex++));
}

return new LogicalSchema(builder.build());
}

/**
* @param columnName the column name to check
* @return {@code true} if the column matches the name of any key column.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,27 @@ public void shouldRemoveKeyColumnsWhereEverTheyAre() {
));
}

@Test
public void shouldRemoveAllButKeyCols() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.keyColumn(K0, INTEGER)
.valueColumn(F0, BIGINT)
.valueColumn(F1, BIGINT)
.build()
.withPseudoAndKeyColsInValue(false);

// When
final LogicalSchema result = schema.withKeyColsOnly();

// Then:
assertThat(result, is(LogicalSchema.builder()
.keyColumn(K0, INTEGER)
.valueColumn(K0, INTEGER)
.build()
));
}

@Test
public void shouldMatchMetaColumnName() {
assertThat(SystemColumns.isPseudoColumn(ROWTIME_NAME), is(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.confluent.ksql.execution.expression.tree.LogicalBinaryExpression;
import io.confluent.ksql.execution.expression.tree.NullLiteral;
import io.confluent.ksql.execution.expression.tree.TraversalExpressionVisitor;
import io.confluent.ksql.execution.util.ColumnExtractor;
import io.confluent.ksql.execution.windows.KsqlWindowExpression;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import com.google.common.collect.ImmutableSet;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.analyzer.ColumnExtractor;
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.util.ColumnExtractor;
import java.util.Collection;
import java.util.HashSet;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.analyzer;
package io.confluent.ksql.execution.util;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love the package this was moved to but couldn't think of anywhere better. Suggestions welcome!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷 it's not a public API so it doesn't really matter to me


import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.Expression;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static KeyBuilder keyBuilder(final ColumnName name, final SqlType type) {
}

@SuppressWarnings("unchecked")
public static List<?> asList(final Object key) {
public static List<Object> asList(final Object key) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this was previously List<?> rather than List<Object>? I don't fully understand the meaning of this change, but it lets us avoid an unchecked cast downstream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://stackoverflow.com/a/35273095/2258040 - for our use case, I don't think it really makes sense to return List<?>

final Optional<Windowed<Object>> windowed = key instanceof Windowed
? Optional.of((Windowed<Object>) key)
: Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (K BIGINT KEY, ID BIGINT, NAME STRING, VALUE BIGINT) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`K` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM REPARTITIONED AS SELECT\n (TEST.K + 2) KSQL_COL_0,\n TEST.ID ID,\n TEST.NAME NAME,\n TEST.VALUE VALUE\nFROM TEST TEST\nPARTITION BY (TEST.K + 2)\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "REPARTITIONED",
"schema" : "`KSQL_COL_0` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT",
"topicName" : "REPARTITIONED",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "REPARTITIONED",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "REPARTITIONED"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSelectKeyV2",
"properties" : {
"queryContext" : "PartitionBy"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"sourceSchema" : "`K` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT"
},
"keyExpression" : "(K + 2)"
},
"keyColumnNames" : [ "KSQL_COL_0" ],
"selectExpressions" : [ "ID AS ID", "NAME AS NAME", "VALUE AS VALUE" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"topicName" : "REPARTITIONED"
},
"queryId" : "CSAS_REPARTITIONED_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.persistence.default.format.key" : "KAFKA",
"ksql.query.error.max.queue.size" : "10",
"ksql.variable.substitution.enable" : "true",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.key.format.enabled" : "false",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.create.or.replace.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.suppress.enabled" : "false",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.persistence.wrap.single.values" : null,
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.schema.registry.url" : "",
"ksql.properties.overrides.denylist" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.metrics.tags.custom" : "",
"ksql.persistence.default.format.value" : null,
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.udf.collect.metrics" : "false",
"ksql.query.pull.thread.pool.size" : "100",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
{
"version" : "6.2.0",
"timestamp" : 1605825669504,
"path" : "query-validation-tests/partition-by.json",
"schemas" : {
"CSAS_REPARTITIONED_0.KsqlTopic.Source" : {
"schema" : "`K` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"CSAS_REPARTITIONED_0.REPARTITIONED" : {
"schema" : "`KSQL_COL_0` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
}
},
"testCase" : {
"name" : "key expression - with null value",
"inputs" : [ {
"topic" : "test_topic",
"key" : 0,
"value" : null
}, {
"topic" : "test_topic",
"key" : 0,
"value" : "0,zero,50"
} ],
"outputs" : [ {
"topic" : "REPARTITIONED",
"key" : 2,
"value" : null
}, {
"topic" : "REPARTITIONED",
"key" : 2,
"value" : "0,zero,50"
} ],
"topics" : [ {
"name" : "test_topic",
"replicas" : 1,
"numPartitions" : 4
}, {
"name" : "REPARTITIONED",
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM TEST (K BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) with (kafka_topic='test_topic', value_format = 'delimited');", "CREATE STREAM REPARTITIONED AS select K + 2, ID, NAME, VALUE from TEST partition by K + 2;" ],
"post" : {
"sources" : [ {
"name" : "REPARTITIONED",
"type" : "STREAM",
"schema" : "`KSQL_COL_0` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : "DELIMITED",
"keyFeatures" : [ ],
"valueFeatures" : [ ]
}, {
"name" : "TEST",
"type" : "STREAM",
"schema" : "`K` BIGINT KEY, `ID` BIGINT, `NAME` STRING, `VALUE` BIGINT",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : "DELIMITED",
"keyFeatures" : [ ],
"valueFeatures" : [ ]
} ],
"topics" : {
"topics" : [ {
"name" : "REPARTITIONED",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
},
"partitions" : 4
}, {
"name" : "test_topic",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
},
"partitions" : 4
} ]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> PartitionBy-SelectKey
<-- KSTREAM-SOURCE-0000000000
Processor: PartitionBy-SelectKey (stores: [])
--> Project
<-- KSTREAM-TRANSFORMVALUES-0000000001
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000004
<-- PartitionBy-SelectKey
Sink: KSTREAM-SINK-0000000004 (topic: REPARTITIONED)
<-- Project

Loading