-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: enable support for
JSON
key format (#6411)
* feat: enable support for JSON keys fixes: #4461 Co-authored-by: Andy Coates <[email protected]>
- Loading branch information
1 parent
7005fb7
commit fe97cde
Showing
77 changed files
with
6,928 additions
and
79 deletions.
There are no files selected for viewing
145 changes: 145 additions & 0 deletions
145
...ests/src/test/resources/historical_plans/as_value_-_BOOLEAN/6.1.0_1602595878201/plan.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
{ | ||
"plan" : [ { | ||
"@type" : "ksqlPlanV1", | ||
"statementText" : "CREATE STREAM INPUT (ID BOOLEAN KEY, V0 INTEGER, V1 INTEGER) WITH (FORMAT='JSON', KAFKA_TOPIC='input');", | ||
"ddlCommand" : { | ||
"@type" : "createStreamV1", | ||
"sourceName" : "INPUT", | ||
"schema" : "`ID` BOOLEAN KEY, `V0` INTEGER, `V1` INTEGER", | ||
"topicName" : "input", | ||
"formats" : { | ||
"keyFormat" : { | ||
"format" : "JSON" | ||
}, | ||
"valueFormat" : { | ||
"format" : "JSON" | ||
}, | ||
"keyFeatures" : [ "UNWRAP_SINGLES" ] | ||
}, | ||
"orReplace" : false | ||
} | ||
}, { | ||
"@type" : "ksqlPlanV1", | ||
"statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.ID ID,\n AS_VALUE(INPUT.ID) ID_COPY,\n INPUT.V1 V1\nFROM INPUT INPUT\nEMIT CHANGES", | ||
"ddlCommand" : { | ||
"@type" : "createStreamV1", | ||
"sourceName" : "OUTPUT", | ||
"schema" : "`ID` BOOLEAN KEY, `ID_COPY` BOOLEAN, `V1` INTEGER", | ||
"topicName" : "OUTPUT", | ||
"formats" : { | ||
"keyFormat" : { | ||
"format" : "JSON" | ||
}, | ||
"valueFormat" : { | ||
"format" : "JSON" | ||
}, | ||
"keyFeatures" : [ "UNWRAP_SINGLES" ] | ||
}, | ||
"orReplace" : false | ||
}, | ||
"queryPlan" : { | ||
"sources" : [ "INPUT" ], | ||
"sink" : "OUTPUT", | ||
"physicalPlan" : { | ||
"@type" : "streamSinkV1", | ||
"properties" : { | ||
"queryContext" : "OUTPUT" | ||
}, | ||
"source" : { | ||
"@type" : "streamSelectV1", | ||
"properties" : { | ||
"queryContext" : "Project" | ||
}, | ||
"source" : { | ||
"@type" : "streamSourceV1", | ||
"properties" : { | ||
"queryContext" : "KsqlTopic/Source" | ||
}, | ||
"topicName" : "input", | ||
"formats" : { | ||
"keyFormat" : { | ||
"format" : "JSON" | ||
}, | ||
"valueFormat" : { | ||
"format" : "JSON" | ||
}, | ||
"keyFeatures" : [ "UNWRAP_SINGLES" ] | ||
}, | ||
"sourceSchema" : "`ID` BOOLEAN KEY, `V0` INTEGER, `V1` INTEGER" | ||
}, | ||
"keyColumnNames" : [ "ID" ], | ||
"selectExpressions" : [ "AS_VALUE(ID) AS ID_COPY", "V1 AS V1" ] | ||
}, | ||
"formats" : { | ||
"keyFormat" : { | ||
"format" : "JSON" | ||
}, | ||
"valueFormat" : { | ||
"format" : "JSON" | ||
}, | ||
"keyFeatures" : [ "UNWRAP_SINGLES" ] | ||
}, | ||
"topicName" : "OUTPUT" | ||
}, | ||
"queryId" : "CSAS_OUTPUT_0" | ||
} | ||
} ], | ||
"configs" : { | ||
"ksql.extension.dir" : "ext", | ||
"ksql.streams.cache.max.bytes.buffering" : "0", | ||
"ksql.security.extension.class" : null, | ||
"metric.reporters" : "", | ||
"ksql.transient.prefix" : "transient_", | ||
"ksql.query.status.running.threshold.seconds" : "300", | ||
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", | ||
"ksql.output.topic.name.prefix" : "", | ||
"ksql.query.pull.enable.standby.reads" : "false", | ||
"ksql.persistence.default.format.key" : "KAFKA", | ||
"ksql.query.error.max.queue.size" : "10", | ||
"ksql.internal.topic.min.insync.replicas" : "1", | ||
"ksql.streams.shutdown.timeout.ms" : "300000", | ||
"ksql.internal.topic.replicas" : "1", | ||
"ksql.insert.into.values.enabled" : "true", | ||
"ksql.key.format.enabled" : "false", | ||
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", | ||
"ksql.query.pull.max.qps" : "2147483647", | ||
"ksql.access.validator.enable" : "auto", | ||
"ksql.streams.bootstrap.servers" : "localhost:0", | ||
"ksql.query.pull.metrics.enabled" : "false", | ||
"ksql.create.or.replace.enabled" : "true", | ||
"ksql.metrics.extension" : null, | ||
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", | ||
"ksql.cast.strings.preserve.nulls" : "true", | ||
"ksql.authorization.cache.max.entries" : "10000", | ||
"ksql.pull.queries.enable" : "true", | ||
"ksql.suppress.enabled" : "false", | ||
"ksql.sink.window.change.log.additional.retention" : "1000000", | ||
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", | ||
"ksql.query.persistent.active.limit" : "2147483647", | ||
"ksql.persistence.wrap.single.values" : null, | ||
"ksql.authorization.cache.expiry.time.secs" : "30", | ||
"ksql.query.retry.backoff.initial.ms" : "15000", | ||
"ksql.schema.registry.url" : "", | ||
"ksql.properties.overrides.denylist" : "", | ||
"ksql.streams.auto.offset.reset" : "earliest", | ||
"ksql.connect.url" : "http://localhost:8083", | ||
"ksql.service.id" : "some.ksql.service.id", | ||
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", | ||
"ksql.streams.commit.interval.ms" : "2000", | ||
"ksql.streams.auto.commit.interval.ms" : "0", | ||
"ksql.streams.topology.optimization" : "all", | ||
"ksql.query.retry.backoff.max.ms" : "900000", | ||
"ksql.streams.num.stream.threads" : "4", | ||
"ksql.timestamp.throw.on.invalid" : "false", | ||
"ksql.metrics.tags.custom" : "", | ||
"ksql.persistence.default.format.value" : null, | ||
"ksql.udfs.enabled" : "true", | ||
"ksql.udf.enable.security.manager" : "true", | ||
"ksql.connect.worker.config" : "", | ||
"ksql.udf.collect.metrics" : "false", | ||
"ksql.persistent.prefix" : "query_", | ||
"ksql.metastore.backup.location" : "", | ||
"ksql.error.classifier.regex" : "", | ||
"ksql.suppress.buffer.size.bytes" : "-1" | ||
} | ||
} |
102 changes: 102 additions & 0 deletions
102
...ests/src/test/resources/historical_plans/as_value_-_BOOLEAN/6.1.0_1602595878201/spec.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
{ | ||
"version" : "6.1.0", | ||
"timestamp" : 1602595878201, | ||
"path" : "query-validation-tests/as_value.json", | ||
"schemas" : { | ||
"CSAS_OUTPUT_0.KsqlTopic.Source" : { | ||
"schema" : "`ID` BOOLEAN KEY, `V0` INTEGER, `V1` INTEGER", | ||
"keyFormat" : { | ||
"format" : "JSON", | ||
"features" : [ "UNWRAP_SINGLES" ] | ||
}, | ||
"valueFormat" : { | ||
"format" : "JSON" | ||
} | ||
}, | ||
"CSAS_OUTPUT_0.OUTPUT" : { | ||
"schema" : "`ID` BOOLEAN KEY, `ID_COPY` BOOLEAN, `V1` INTEGER", | ||
"keyFormat" : { | ||
"format" : "JSON", | ||
"features" : [ "UNWRAP_SINGLES" ] | ||
}, | ||
"valueFormat" : { | ||
"format" : "JSON" | ||
} | ||
} | ||
}, | ||
"testCase" : { | ||
"name" : "BOOLEAN", | ||
"inputs" : [ { | ||
"topic" : "input", | ||
"key" : true, | ||
"value" : { | ||
"V0" : 2, | ||
"V1" : 3 | ||
} | ||
} ], | ||
"outputs" : [ { | ||
"topic" : "OUTPUT", | ||
"key" : true, | ||
"value" : { | ||
"ID_COPY" : true, | ||
"V1" : 3 | ||
} | ||
} ], | ||
"topics" : [ { | ||
"name" : "input", | ||
"replicas" : 1, | ||
"numPartitions" : 4 | ||
}, { | ||
"name" : "OUTPUT", | ||
"replicas" : 1, | ||
"numPartitions" : 4 | ||
} ], | ||
"statements" : [ "CREATE STREAM INPUT (ID BOOLEAN KEY, V0 INT, V1 INT) WITH (kafka_topic='input', format='JSON');", "CREATE STREAM OUTPUT AS SELECT ID, AS_VALUE(ID) AS ID_COPY, V1 FROM INPUT;" ], | ||
"post" : { | ||
"sources" : [ { | ||
"name" : "INPUT", | ||
"type" : "STREAM", | ||
"schema" : "`ID` BOOLEAN KEY, `V0` INTEGER, `V1` INTEGER", | ||
"keyFormat" : { | ||
"format" : "JSON" | ||
}, | ||
"valueFormat" : "JSON", | ||
"keyFeatures" : [ "UNWRAP_SINGLES" ], | ||
"valueFeatures" : [ ] | ||
}, { | ||
"name" : "OUTPUT", | ||
"type" : "STREAM", | ||
"schema" : "`ID` BOOLEAN KEY, `ID_COPY` BOOLEAN, `V1` INTEGER", | ||
"keyFormat" : { | ||
"format" : "JSON" | ||
}, | ||
"valueFormat" : "JSON", | ||
"keyFeatures" : [ "UNWRAP_SINGLES" ], | ||
"valueFeatures" : [ ] | ||
} ], | ||
"topics" : { | ||
"topics" : [ { | ||
"name" : "input", | ||
"keyFormat" : { | ||
"format" : "JSON", | ||
"features" : [ "UNWRAP_SINGLES" ] | ||
}, | ||
"valueFormat" : { | ||
"format" : "JSON" | ||
}, | ||
"partitions" : 4 | ||
}, { | ||
"name" : "OUTPUT", | ||
"keyFormat" : { | ||
"format" : "JSON", | ||
"features" : [ "UNWRAP_SINGLES" ] | ||
}, | ||
"valueFormat" : { | ||
"format" : "JSON" | ||
}, | ||
"partitions" : 4 | ||
} ] | ||
} | ||
} | ||
} | ||
} |
13 changes: 13 additions & 0 deletions
13
...tests/src/test/resources/historical_plans/as_value_-_BOOLEAN/6.1.0_1602595878201/topology
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
Topologies: | ||
Sub-topology: 0 | ||
Source: KSTREAM-SOURCE-0000000000 (topics: [input]) | ||
--> KSTREAM-TRANSFORMVALUES-0000000001 | ||
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) | ||
--> Project | ||
<-- KSTREAM-SOURCE-0000000000 | ||
Processor: Project (stores: []) | ||
--> KSTREAM-SINK-0000000003 | ||
<-- KSTREAM-TRANSFORMVALUES-0000000001 | ||
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) | ||
<-- Project | ||
|
Oops, something went wrong.