Skip to content

Commit

Permalink
feat: support PARTITION BY NULL for creating keyless stream (#6096)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Aug 27, 2020
1 parent 330e788 commit 81e3142
Show file tree
Hide file tree
Showing 7 changed files with 341 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.NullLiteral;
import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
Expand Down Expand Up @@ -64,7 +65,8 @@ public Stream<ColumnName> resolveSelectStar(

@Override
void validateKeyPresent(final SourceName sinkName, final Projection projection) {
if (!projection.containsExpression(getPartitionBy())) {
final Expression partitionBy = getPartitionBy();
if (!(partitionBy instanceof NullLiteral) && !projection.containsExpression(partitionBy)) {
final ImmutableList<Expression> keys = ImmutableList.of(originalPartitionBy);
throwKeysNotIncludedError(sinkName, "partitioning expression", keys);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (ID INTEGER KEY, NAME STRING) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ID` INTEGER KEY, `NAME` STRING",
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.ID ID,\n INPUT.NAME NAME\nFROM INPUT INPUT\nPARTITION BY null\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ID` INTEGER, `NAME` STRING",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSelectKeyV2",
"properties" : {
"queryContext" : "PartitionBy"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`ID` INTEGER KEY, `NAME` STRING"
},
"keyExpression" : "null"
},
"selectExpressions" : [ "ID AS ID", "NAME AS NAME" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.streams.max.task.idle.ms" : "0",
"ksql.query.error.max.queue.size" : "10",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.create.or.replace.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.suppress.enabled" : "true",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.persistence.wrap.single.values" : null,
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.schema.registry.url" : "",
"ksql.properties.overrides.denylist" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.enable.metastore.backup" : "false",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.metrics.tags.custom" : "",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
{
"version" : "6.1.0",
"timestamp" : 1598409518833,
"path" : "query-validation-tests/partition-by.json",
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : {
"schema" : "`ID` INTEGER KEY, `NAME` STRING",
"serdeOptions" : [ ]
},
"CSAS_OUTPUT_0.OUTPUT" : {
"schema" : "`ID` INTEGER, `NAME` STRING",
"serdeOptions" : [ ]
}
},
"testCase" : {
"name" : "null",
"inputs" : [ {
"topic" : "input",
"key" : 10,
"value" : {
"NAME" : "bob"
}
} ],
"outputs" : [ {
"topic" : "OUTPUT",
"key" : null,
"value" : {
"NAME" : "bob",
"ID" : 10
}
} ],
"topics" : [ {
"name" : "input",
"replicas" : 1,
"numPartitions" : 4
}, {
"name" : "OUTPUT",
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM INPUT (ID INT KEY, NAME STRING) with (kafka_topic='input', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT ID, NAME from INPUT partition by null;" ],
"post" : {
"sources" : [ {
"name" : "OUTPUT",
"type" : "STREAM",
"schema" : "`ID` INTEGER, `NAME` STRING",
"keyFormat" : {
"format" : "KAFKA"
},
"serdeOptions" : [ ]
}, {
"name" : "INPUT",
"type" : "STREAM",
"schema" : "`ID` INTEGER KEY, `NAME` STRING",
"keyFormat" : {
"format" : "KAFKA"
},
"serdeOptions" : [ ]
} ],
"topics" : {
"topics" : [ {
"name" : "input",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
}, {
"name" : "OUTPUT",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
} ],
"blackList" : ".*-repartition"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> PartitionBy-SelectKey
<-- KSTREAM-SOURCE-0000000000
Processor: PartitionBy-SelectKey (stores: [])
--> Project
<-- KSTREAM-TRANSFORMVALUES-0000000001
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000004
<-- PartitionBy-SelectKey
Sink: KSTREAM-SINK-0000000004 (topic: OUTPUT)
<-- Project

Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,23 @@
]
}
},
{
"name": "null",
"statements": [
"CREATE STREAM INPUT (ID INT KEY, NAME STRING) with (kafka_topic='input', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT ID, NAME from INPUT partition by null;"
],
"inputs": [{"topic": "input", "key": 10, "value": {"NAME": "bob"}}],
"outputs": [{"topic": "OUTPUT", "key": null, "value": {"NAME": "bob", "ID": 10}}],
"post": {
"topics": {
"blacklist": ".*-repartition"
},
"sources": [
{"name": "OUTPUT", "type": "stream", "schema": "ID INT, NAME STRING"}
]
}
},
{
"name": "key in projection more than once",
"statements": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.ksql.execution.codegen.ExpressionMetadata;
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.NullLiteral;
import io.confluent.ksql.execution.util.ExpressionTypeManager;
import io.confluent.ksql.execution.util.StructKeyUtil;
import io.confluent.ksql.execution.util.StructKeyUtil.KeyBuilder;
Expand Down Expand Up @@ -51,12 +52,17 @@
* Note: the value columns does not need to change.
*
* <p>When PARTITIONing BY any other type of expression no column can be removed from the logical
* schema's value columns. The PARTITION BY expression is creating a <i>new</i> column. Hence, the
* existing key column(s) are moved to the value schema and a <i>new</i> key column is added, e.g.
* logically {@code A => B, C}, when {@code PARTITION BY exp}, becomes {@code KSQL_COL_0 => B, C, A}
* However, processing schemas contain a copy of the key columns in the value, so actually {@code
* A => B, C, A} becomes {@code KSQL_COL_0 => B, C, A, KSQL_COL_0}. Note: the value column only has
* the new key column added.
* schema's value columns. The PARTITION BY expression is creating a <i>new</i> column (except in
* the case of PARTITION BY NULL -- see below). Hence, the existing key column(s) are moved to the
* value schema and a <i>new</i> key column is added, e.g. logically {@code A => B, C}, when
* {@code PARTITION BY exp}, becomes {@code KSQL_COL_0 => B, C, A}. However, processing schemas
* contain a copy of the key columns in the value, so actually {@code A => B, C, A} becomes
* {@code KSQL_COL_0 => B, C, A, KSQL_COL_0}. Note: the value column only has the new key column
* added.
*
* <p>When PARTITIONing BY NULL, the existing key column(ns) are moved into the value schema and
* the new key is null. Because processing schemas contain a copy of the key columns in the value,
* the value columns do not need to change. Instead, the key is just set to null.
*/
public final class PartitionByParamsFactory {

Expand All @@ -72,19 +78,24 @@ public static PartitionByParams build(
) {
final Optional<ColumnName> partitionByCol = getPartitionByColumnName(sourceSchema, partitionBy);

final Function<GenericRow, Object> evaluator = buildExpressionEvaluator(
sourceSchema,
partitionBy,
ksqlConfig,
functionRegistry,
logger
);

final LogicalSchema resultSchema =
buildSchema(sourceSchema, partitionBy, functionRegistry, partitionByCol);

final BiFunction<Object, GenericRow, KeyValue<Struct, GenericRow>> mapper =
buildMapper(resultSchema, partitionByCol, evaluator);
final BiFunction<Object, GenericRow, KeyValue<Struct, GenericRow>> mapper;
if (partitionBy instanceof NullLiteral) {
// In case of PARTITION BY NULL, it is sufficient to set the new key to null as the old key
// is already present in the current value
mapper = (k, v) -> new KeyValue<>(null, v);
} else {
final Function<GenericRow, Object> evaluator = buildExpressionEvaluator(
sourceSchema,
partitionBy,
ksqlConfig,
functionRegistry,
logger
);
mapper = buildMapper(resultSchema, partitionByCol, evaluator);
}

return new PartitionByParams(resultSchema, mapper);
}
Expand Down Expand Up @@ -115,11 +126,13 @@ private static LogicalSchema buildSchema(
final ColumnName newKeyName = partitionByCol
.orElseGet(() -> ColumnNames.uniqueAliasFor(partitionBy, sourceSchema));

final Builder builder = LogicalSchema.builder()
.keyColumn(newKeyName, keyType)
.valueColumns(sourceSchema.value());
final Builder builder = LogicalSchema.builder();
if (keyType != null) {
builder.keyColumn(newKeyName, keyType);
}
builder.valueColumns(sourceSchema.value());

if (!partitionByCol.isPresent()) {
if (keyType != null && !partitionByCol.isPresent()) {
// New key column added, copy in to value schema:
builder.valueColumn(newKeyName, keyType);
}
Expand Down
Loading

0 comments on commit 81e3142

Please sign in to comment.