diff --git a/docs/developer-guide/serialization.md b/docs/developer-guide/serialization.md index 7217be3874fe..3cfb4f93e01a 100644 --- a/docs/developer-guide/serialization.md +++ b/docs/developer-guide/serialization.md @@ -47,7 +47,7 @@ currently supported as key formats. See individual formats for details. | Feature | Supported | |------------------------------|-----------| | As value format | Yes | -| As key format | No | +| As key format | Yes | | [Schema Registry required][0]| No | | [Schema inference][1] | No | | [Single field wrapping][2] | No | diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_BIGINT_-_key/6.1.0_1601555627743/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_BIGINT_-_key/6.1.0_1601555627743/plan.json new file mode 100644 index 000000000000..cb586a1f8632 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_BIGINT_-_key/6.1.0_1601555627743/plan.json @@ -0,0 +1,141 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (K BIGINT KEY, V INTEGER) WITH (FORMAT='DELIMITED', KAFKA_TOPIC='input_topic');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`K` BIGINT KEY, `V` INTEGER", + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`K` BIGINT KEY, `V` INTEGER", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`K` BIGINT KEY, `V` INTEGER" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "V AS V" ] + }, + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.error.max.queue.size" : "10", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.key.format.enabled" : "false", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_BIGINT_-_key/6.1.0_1601555627743/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_BIGINT_-_key/6.1.0_1601555627743/spec.json new file mode 100644 index 000000000000..b9e1585b9e22 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_BIGINT_-_key/6.1.0_1601555627743/spec.json @@ -0,0 +1,96 @@ +{ + "version" : "6.1.0", + "timestamp" : 1601555627743, + "path" : "query-validation-tests/delimited.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`K` BIGINT KEY, `V` INTEGER" + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`K` BIGINT KEY, `V` INTEGER" + } + }, + "testCase" : { + "name" : "BIGINT - key", + "inputs" : [ { + "topic" : "input_topic", + "key" : "998877665544332211", + "value" : "0" + }, { + "topic" : "input_topic", + "key" : "\"10\"", + "value" : "0" + }, { + "topic" : "input_topic", + "key" : null, + "value" : "0" + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "998877665544332211", + "value" : "0" + }, { + "topic" : "OUTPUT", + "key" : "10", + "value" : "0" + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : "0" + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "input_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (K BIGINT KEY, V INT) WITH (kafka_topic='input_topic', format='DELIMITED');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`K` BIGINT KEY, `V` INTEGER", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`K` BIGINT KEY, `V` INTEGER", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "input_topic", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_BIGINT_-_key/6.1.0_1601555627743/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_BIGINT_-_key/6.1.0_1601555627743/topology new file mode 100644 index 000000000000..12f8f6574002 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_BIGINT_-_key/6.1.0_1601555627743/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_BOOLEAN_-_key/6.1.0_1601555627700/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_BOOLEAN_-_key/6.1.0_1601555627700/plan.json new file mode 100644 index 000000000000..04be39e48651 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_BOOLEAN_-_key/6.1.0_1601555627700/plan.json @@ -0,0 +1,141 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (K BOOLEAN KEY, V INTEGER) WITH (FORMAT='DELIMITED', KAFKA_TOPIC='input_topic');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`K` BOOLEAN KEY, `V` INTEGER", + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`K` BOOLEAN KEY, `V` INTEGER", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`K` BOOLEAN KEY, `V` INTEGER" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "V AS V" ] + }, + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.error.max.queue.size" : "10", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.key.format.enabled" : "false", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_BOOLEAN_-_key/6.1.0_1601555627700/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_BOOLEAN_-_key/6.1.0_1601555627700/spec.json new file mode 100644 index 000000000000..5980143d80f1 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_BOOLEAN_-_key/6.1.0_1601555627700/spec.json @@ -0,0 +1,104 @@ +{ + "version" : "6.1.0", + "timestamp" : 1601555627700, + "path" : "query-validation-tests/delimited.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`K` BOOLEAN KEY, `V` INTEGER" + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`K` BOOLEAN KEY, `V` INTEGER" + } + }, + "testCase" : { + "name" : "BOOLEAN - key", + "inputs" : [ { + "topic" : "input_topic", + "key" : "true", + "value" : "0" + }, { + "topic" : "input_topic", + "key" : "\"true\"", + "value" : "0" + }, { + "topic" : "input_topic", + "key" : "false", + "value" : "0" + }, { + "topic" : "input_topic", + "key" : null, + "value" : "0" + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "true", + "value" : "0" + }, { + "topic" : "OUTPUT", + "key" : "true", + "value" : "0" + }, { + "topic" : "OUTPUT", + "key" : "false", + "value" : "0" + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : "0" + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "input_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (K BOOLEAN KEY, V INT) WITH (kafka_topic='input_topic', format='DELIMITED');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`K` BOOLEAN KEY, `V` INTEGER", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`K` BOOLEAN KEY, `V` INTEGER", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "input_topic", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_BOOLEAN_-_key/6.1.0_1601555627700/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_BOOLEAN_-_key/6.1.0_1601555627700/topology new file mode 100644 index 000000000000..12f8f6574002 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_BOOLEAN_-_key/6.1.0_1601555627700/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_DECIMAL_-_key/6.1.0_1601555627789/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_DECIMAL_-_key/6.1.0_1601555627789/plan.json new file mode 100644 index 000000000000..7ab3e204ff70 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_DECIMAL_-_key/6.1.0_1601555627789/plan.json @@ -0,0 +1,141 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (K DECIMAL(6, 4) KEY, V INTEGER) WITH (FORMAT='DELIMITED', KAFKA_TOPIC='input_topic');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`K` DECIMAL(6, 4) KEY, `V` INTEGER", + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`K` DECIMAL(6, 4) KEY, `V` INTEGER", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`K` DECIMAL(6, 4) KEY, `V` INTEGER" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "V AS V" ] + }, + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.error.max.queue.size" : "10", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.key.format.enabled" : "false", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_DECIMAL_-_key/6.1.0_1601555627789/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_DECIMAL_-_key/6.1.0_1601555627789/spec.json new file mode 100644 index 000000000000..ce2e3cba77ee --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_DECIMAL_-_key/6.1.0_1601555627789/spec.json @@ -0,0 +1,96 @@ +{ + "version" : "6.1.0", + "timestamp" : 1601555627789, + "path" : "query-validation-tests/delimited.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`K` DECIMAL(6, 4) KEY, `V` INTEGER" + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`K` DECIMAL(6, 4) KEY, `V` INTEGER" + } + }, + "testCase" : { + "name" : "DECIMAL - key", + "inputs" : [ { + "topic" : "input_topic", + "key" : "12.3650", + "value" : "0" + }, { + "topic" : "input_topic", + "key" : "\"12.0\"", + "value" : "0" + }, { + "topic" : "input_topic", + "key" : null, + "value" : "0" + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "12.3650", + "value" : "0" + }, { + "topic" : "OUTPUT", + "key" : "12.0000", + "value" : "0" + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : "0" + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "input_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (K DECIMAL(6,4) KEY, V INT) WITH (kafka_topic='input_topic', format='DELIMITED');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`K` DECIMAL(6, 4) KEY, `V` INTEGER", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`K` DECIMAL(6, 4) KEY, `V` INTEGER", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "input_topic", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_DECIMAL_-_key/6.1.0_1601555627789/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_DECIMAL_-_key/6.1.0_1601555627789/topology new file mode 100644 index 000000000000..12f8f6574002 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_DECIMAL_-_key/6.1.0_1601555627789/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_DOUBLE_-_key/6.1.0_1601555627757/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_DOUBLE_-_key/6.1.0_1601555627757/plan.json new file mode 100644 index 000000000000..d4e6ab23ea69 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_DOUBLE_-_key/6.1.0_1601555627757/plan.json @@ -0,0 +1,141 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (K DOUBLE KEY, V INTEGER) WITH (FORMAT='DELIMITED', KAFKA_TOPIC='input_topic');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`K` DOUBLE KEY, `V` INTEGER", + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`K` DOUBLE KEY, `V` INTEGER", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`K` DOUBLE KEY, `V` INTEGER" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "V AS V" ] + }, + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.error.max.queue.size" : "10", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.key.format.enabled" : "false", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_DOUBLE_-_key/6.1.0_1601555627757/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_DOUBLE_-_key/6.1.0_1601555627757/spec.json new file mode 100644 index 000000000000..edb6a19d9d25 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_DOUBLE_-_key/6.1.0_1601555627757/spec.json @@ -0,0 +1,96 @@ +{ + "version" : "6.1.0", + "timestamp" : 1601555627757, + "path" : "query-validation-tests/delimited.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`K` DOUBLE KEY, `V` INTEGER" + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`K` DOUBLE KEY, `V` INTEGER" + } + }, + "testCase" : { + "name" : "DOUBLE - key", + "inputs" : [ { + "topic" : "input_topic", + "key" : "654.321", + "value" : "0" + }, { + "topic" : "input_topic", + "key" : "\"123.456\"", + "value" : "0" + }, { + "topic" : "input_topic", + "key" : null, + "value" : "0" + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "654.321", + "value" : "0" + }, { + "topic" : "OUTPUT", + "key" : "123.456", + "value" : "0" + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : "0" + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "input_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (K DOUBLE KEY, V INT) WITH (kafka_topic='input_topic', format='DELIMITED');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`K` DOUBLE KEY, `V` INTEGER", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`K` DOUBLE KEY, `V` INTEGER", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "input_topic", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_DOUBLE_-_key/6.1.0_1601555627757/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_DOUBLE_-_key/6.1.0_1601555627757/topology new file mode 100644 index 000000000000..12f8f6574002 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_DOUBLE_-_key/6.1.0_1601555627757/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_INT_-_key/6.1.0_1601555627728/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_INT_-_key/6.1.0_1601555627728/plan.json new file mode 100644 index 000000000000..b27a203b8e07 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_INT_-_key/6.1.0_1601555627728/plan.json @@ -0,0 +1,141 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (K INTEGER KEY, V INTEGER) WITH (FORMAT='DELIMITED', KAFKA_TOPIC='input_topic');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`K` INTEGER KEY, `V` INTEGER", + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`K` INTEGER KEY, `V` INTEGER", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`K` INTEGER KEY, `V` INTEGER" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "V AS V" ] + }, + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.error.max.queue.size" : "10", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.key.format.enabled" : "false", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_INT_-_key/6.1.0_1601555627728/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_INT_-_key/6.1.0_1601555627728/spec.json new file mode 100644 index 000000000000..76fb06126b86 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_INT_-_key/6.1.0_1601555627728/spec.json @@ -0,0 +1,96 @@ +{ + "version" : "6.1.0", + "timestamp" : 1601555627728, + "path" : "query-validation-tests/delimited.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`K` INTEGER KEY, `V` INTEGER" + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`K` INTEGER KEY, `V` INTEGER" + } + }, + "testCase" : { + "name" : "INT - key", + "inputs" : [ { + "topic" : "input_topic", + "key" : "33", + "value" : "0" + }, { + "topic" : "input_topic", + "key" : "\"22\"", + "value" : "0" + }, { + "topic" : "input_topic", + "key" : null, + "value" : "0" + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "33", + "value" : "0" + }, { + "topic" : "OUTPUT", + "key" : "22", + "value" : "0" + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : "0" + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "input_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (K INT KEY, V INT) WITH (kafka_topic='input_topic', format='DELIMITED');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`K` INTEGER KEY, `V` INTEGER", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`K` INTEGER KEY, `V` INTEGER", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "input_topic", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_INT_-_key/6.1.0_1601555627728/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_INT_-_key/6.1.0_1601555627728/topology new file mode 100644 index 000000000000..12f8f6574002 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_INT_-_key/6.1.0_1601555627728/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_STRING_-_key/6.1.0_1601555627776/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_STRING_-_key/6.1.0_1601555627776/plan.json new file mode 100644 index 000000000000..1b1cbfce9496 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_STRING_-_key/6.1.0_1601555627776/plan.json @@ -0,0 +1,141 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (K STRING KEY, V INTEGER) WITH (FORMAT='DELIMITED', KAFKA_TOPIC='input_topic');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`K` STRING KEY, `V` INTEGER", + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`K` STRING KEY, `V` INTEGER", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`K` STRING KEY, `V` INTEGER" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "V AS V" ] + }, + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.error.max.queue.size" : "10", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.key.format.enabled" : "false", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_STRING_-_key/6.1.0_1601555627776/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_STRING_-_key/6.1.0_1601555627776/spec.json new file mode 100644 index 000000000000..dd72f7ab2a92 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_STRING_-_key/6.1.0_1601555627776/spec.json @@ -0,0 +1,96 @@ +{ + "version" : "6.1.0", + "timestamp" : 1601555627776, + "path" : "query-validation-tests/delimited.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`K` STRING KEY, `V` INTEGER" + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`K` STRING KEY, `V` INTEGER" + } + }, + "testCase" : { + "name" : "STRING - key", + "inputs" : [ { + "topic" : "input_topic", + "key" : "Hey", + "value" : "0" + }, { + "topic" : "input_topic", + "key" : "\"You\"", + "value" : "0" + }, { + "topic" : "input_topic", + "key" : null, + "value" : "0" + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "Hey", + "value" : "0" + }, { + "topic" : "OUTPUT", + "key" : "You", + "value" : "0" + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : "0" + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "input_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (K STRING KEY, V INT) WITH (kafka_topic='input_topic', format='DELIMITED');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `V` INTEGER", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `V` INTEGER", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "input_topic", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_STRING_-_key/6.1.0_1601555627776/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_STRING_-_key/6.1.0_1601555627776/topology new file mode 100644 index 000000000000..12f8f6574002 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_STRING_-_key/6.1.0_1601555627776/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_keyless/6.1.0_1601555628104/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_keyless/6.1.0_1601555628104/plan.json new file mode 100644 index 000000000000..e511b66e6fcb --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_keyless/6.1.0_1601555628104/plan.json @@ -0,0 +1,140 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (V INTEGER) WITH (FORMAT='DELIMITED', KAFKA_TOPIC='input_topic');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`V` INTEGER", + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`V` INTEGER", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`V` INTEGER" + }, + "selectExpressions" : [ "V AS V" ] + }, + "formats" : { + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.error.max.queue.size" : "10", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.key.format.enabled" : "false", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_keyless/6.1.0_1601555628104/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_keyless/6.1.0_1601555628104/spec.json new file mode 100644 index 000000000000..fdedfc7c0e4a --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_keyless/6.1.0_1601555628104/spec.json @@ -0,0 +1,80 @@ +{ + "version" : "6.1.0", + "timestamp" : 1601555628104, + "path" : "query-validation-tests/delimited.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`V` INTEGER" + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`V` INTEGER" + } + }, + "testCase" : { + "name" : "keyless", + "inputs" : [ { + "topic" : "input_topic", + "key" : null, + "value" : "0" + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : null, + "value" : "0" + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "input_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (V INT) WITH (kafka_topic='input_topic', format='DELIMITED');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`V` INTEGER", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`V` INTEGER", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "input_topic", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "DELIMITED" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_keyless/6.1.0_1601555628104/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_keyless/6.1.0_1601555628104/topology new file mode 100644 index 000000000000..12f8f6574002 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/delimited_-_keyless/6.1.0_1601555628104/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/delimited.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/delimited.json index ad2cc3de0c3b..65b827954ce0 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/delimited.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/delimited.json @@ -3,6 +3,140 @@ "When using value_format DELIMITED, we can define VALUE_DELIMITER as custom character." ], "tests": [ + { + "name": "BOOLEAN - key", + "statements": [ + "CREATE STREAM INPUT (K BOOLEAN KEY, V INT) WITH (kafka_topic='input_topic', format='DELIMITED');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "key": "true", "value": "0"}, + {"topic": "input_topic", "key": "\"true\"", "value": "0"}, + {"topic": "input_topic", "key": "false", "value": "0"}, + {"topic": "input_topic", "key": null, "value": "0"} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "true", "value": "0"}, + {"topic": "OUTPUT", "key": "true", "value": "0"}, + {"topic": "OUTPUT", "key": "false", "value": "0"}, + {"topic": "OUTPUT", "key": null, "value": "0"} + ] + }, + { + "name": "INT - key", + "statements": [ + "CREATE STREAM INPUT (K INT KEY, V INT) WITH (kafka_topic='input_topic', format='DELIMITED');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "key": "33", "value": "0"}, + {"topic": "input_topic", "key": "\"22\"", "value": "0"}, + {"topic": "input_topic", "key": null, "value": "0"} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "33", "value": "0"}, + {"topic": "OUTPUT", "key": "22", "value": "0"}, + {"topic": "OUTPUT", "key": null, "value": "0"} + ] + }, + { + "name": "BIGINT - key", + "statements": [ + "CREATE STREAM INPUT (K BIGINT KEY, V INT) WITH (kafka_topic='input_topic', format='DELIMITED');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "key": "998877665544332211", "value": "0"}, + {"topic": "input_topic", "key": "\"10\"", "value": "0"}, + {"topic": "input_topic", "key": null, "value": "0"} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "998877665544332211", "value": "0"}, + {"topic": "OUTPUT", "key": "10", "value": "0"}, + {"topic": "OUTPUT", "key": null, "value": "0"} + ] + }, + { + "name": "DOUBLE - key", + "statements": [ + "CREATE STREAM INPUT (K DOUBLE KEY, V INT) WITH (kafka_topic='input_topic', format='DELIMITED');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "key": "654.321", "value": "0"}, + {"topic": "input_topic", "key": "\"123.456\"", "value": "0"}, + {"topic": "input_topic", "key": null, "value": "0"} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "654.321", "value": "0"}, + {"topic": "OUTPUT", "key": "123.456", "value": "0"}, + {"topic": "OUTPUT", "key": null, "value": "0"} + ] + }, + { + "name": "STRING - key", + "statements": [ + "CREATE STREAM INPUT (K STRING KEY, V INT) WITH (kafka_topic='input_topic', format='DELIMITED');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "key": "Hey", "value": "0"}, + {"topic": "input_topic", "key": "\"You\"", "value": "0"}, + {"topic": "input_topic", "key": null, "value": "0"} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "Hey", "value": "0"}, + {"topic": "OUTPUT", "key": "You", "value": "0"}, + {"topic": "OUTPUT", "key": null, "value": "0"} + ] + }, + { + "name": "DECIMAL - key", + "statements": [ + "CREATE STREAM INPUT (K DECIMAL(6,4) KEY, V INT) WITH (kafka_topic='input_topic', format='DELIMITED');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "key": "12.3650", "value": "0"}, + {"topic": "input_topic", "key": "\"12.0\"", "value": "0"}, + {"topic": "input_topic", "key": null, "value": "0"} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "12.3650", "value": "0"}, + {"topic": "OUTPUT", "key": "12.0000", "value": "0"}, + {"topic": "OUTPUT", "key": null, "value": "0"} + ] + }, + { + "name": "ARRAY - key", + "statements": [ + "CREATE STREAM INPUT (K ARRAY KEY, V INT) WITH (kafka_topic='input_topic', format='DELIMITED');" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Unsupported key schema: [`K` ARRAY KEY]" + } + }, + { + "name": "MAP - key", + "statements": [ + "CREATE STREAM INPUT (K MAP KEY, V INT) WITH (kafka_topic='input_topic', format='DELIMITED');" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Unsupported key schema: [`K` MAP KEY]" + } + }, + { + "name": "STRUCT - key", + "statements": [ + "CREATE STREAM INPUT (K STRUCT KEY, V INT) WITH (kafka_topic='input_topic', format='DELIMITED');" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Unsupported key schema: [`K` STRUCT<`F` DOUBLE> KEY]" + } + }, { "name": "deserialize anonymous primitive by default - value - DELIMITED", "comments": "DELIMITED supports anonymous primitives by default", @@ -261,6 +395,19 @@ {"topic": "OUTPUT", "value": "\"-12345.1200000000000000\""}, {"topic": "OUTPUT", "value": "0.0000000000000000"} ] + }, + { + "name": "keyless", + "statements": [ + "CREATE STREAM INPUT (V INT) WITH (kafka_topic='input_topic', format='DELIMITED');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "value": "0"} + ], + "outputs": [ + {"topic": "OUTPUT", "value": "0"} + ] } ] } \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/formats.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/formats.json index ac1700604a2e..99bb60c0446b 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/formats.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/formats.json @@ -199,11 +199,11 @@ { "name": "unsupported format - create source", "statements": [ - "CREATE STREAM TEST (foo VARCHAR) WITH (kafka_topic='test_topic', format='DELIMITED');" + "CREATE STREAM TEST (foo VARCHAR) WITH (kafka_topic='test_topic', format='PROTOBUF');" ], "expectedException": { "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "The key format 'DELIMITED' is not currently supported." + "message": "The key format 'PROTOBUF' is not currently supported." } }, { diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/KeyFormatUtils.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/KeyFormatUtils.java index c6d296aeaf42..8808f70263a9 100644 --- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/KeyFormatUtils.java +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/KeyFormatUtils.java @@ -22,7 +22,7 @@ public final class KeyFormatUtils { private static final List SUPPORTED_KEY_FORMATS = - ImmutableList.of(FormatFactory.KAFKA); + ImmutableList.of(FormatFactory.KAFKA, FormatFactory.DELIMITED); private static final List KEY_FORMATS_UNDER_DEVELOPMENT = ImmutableList.of(FormatFactory.JSON);