diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSerdeSupplier.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSerdeSupplier.java index 9a73a9c7a0be..9a8b18442045 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSerdeSupplier.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/json/ValueSpecJsonSerdeSupplier.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.ksql.serde.json.JsonSerdeUtils; import io.confluent.ksql.test.serde.SerdeSupplier; import java.math.BigDecimal; import java.util.Collection; @@ -31,6 +32,7 @@ import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.commons.lang3.ArrayUtils; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; @@ -38,6 +40,11 @@ public class ValueSpecJsonSerdeSupplier implements SerdeSupplier { private static final ObjectMapper MAPPER = new ObjectMapper() .enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS); + private final boolean useSchemas; + + public ValueSpecJsonSerdeSupplier(final boolean useSchemas) { + this.useSchemas = useSchemas; + } @Override public Serializer getSerializer(final SchemaRegistryClient schemaRegistryClient) { @@ -49,7 +56,7 @@ public Deserializer getDeserializer(final SchemaRegistryClient schemaReg return new ValueSpecJsonDeserializer(); } - private static final class ValueSpecJsonSerializer implements Serializer { + private final class ValueSpecJsonSerializer implements Serializer { @Override public void close() { } @@ -65,14 +72,21 @@ public byte[] serialize(final String topicName, final Object spec) { } try { final Object toSerialize = Converter.toJsonNode(spec); - return MAPPER.writeValueAsBytes(toSerialize); + final byte[] bytes = MAPPER.writeValueAsBytes(toSerialize); + if (!useSchemas) { + return bytes; + } + + return ArrayUtils.addAll( + new byte[]{/*magic*/ 0x00, /*schemaID*/ 0x00, 0x00, 0x00, 0x01}, + bytes); } catch (final Exception e) { throw new RuntimeException(e); } } } - private static final class ValueSpecJsonDeserializer implements Deserializer { + private final class ValueSpecJsonDeserializer implements Deserializer { @Override public void close() { } @@ -87,7 +101,9 @@ public Object deserialize(final String topicName, final byte[] data) { return null; } try { - return MAPPER.readValue(data, Object.class); + return useSchemas + ? JsonSerdeUtils.readJsonSR(data, MAPPER, Object.class) + : MAPPER.readValue(data, Object.class); } catch (final Exception e) { throw new RuntimeException(e); } diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java index 39a9670dd193..bbd98bb885fa 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java @@ -32,6 +32,7 @@ import io.confluent.ksql.serde.avro.AvroFormat; import io.confluent.ksql.serde.delimited.DelimitedFormat; import io.confluent.ksql.serde.json.JsonFormat; +import io.confluent.ksql.serde.json.JsonSchemaFormat; import io.confluent.ksql.serde.kafka.KafkaFormat; import io.confluent.ksql.serde.protobuf.ProtobufFormat; import io.confluent.ksql.test.serde.SerdeSupplier; @@ -66,7 +67,8 @@ public static SerdeSupplier getSerdeSupplier( switch (format.name()) { case AvroFormat.NAME: return new ValueSpecAvroSerdeSupplier(); case ProtobufFormat.NAME: return new ValueSpecProtobufSerdeSupplier(); - case JsonFormat.NAME: return new ValueSpecJsonSerdeSupplier(); + case JsonFormat.NAME: return new ValueSpecJsonSerdeSupplier(false); + case JsonSchemaFormat.NAME: return new ValueSpecJsonSerdeSupplier(true); case DelimitedFormat.NAME: return new StringSerdeSupplier(); case KafkaFormat.NAME: return new KafkaSerdeSupplier(schema); default: @@ -89,7 +91,8 @@ public static Optional buildSchema(final JsonNode schema, final St new AvroFormat() .toParsedSchema(new AvroData(1).toConnectSchema(avroSchema)) ); - } else if (format.equalsIgnoreCase(JsonFormat.NAME)) { + } else if (format.equalsIgnoreCase(JsonFormat.NAME) + || format.equalsIgnoreCase(JsonSchemaFormat.NAME)) { final String schemaString = OBJECT_MAPPER.writeValueAsString(schema); return Optional.of(new JsonSchema(schemaString)); } else if (format.equalsIgnoreCase(ProtobufFormat.NAME)) { diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_array_element_OK_-_JSON_SR/5.5.0_1582741261090/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_array_element_OK_-_JSON_SR/5.5.0_1582741261090/spec.json new file mode 100644 index 000000000000..2b510e7eed57 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_array_element_OK_-_JSON_SR/5.5.0_1582741261090/spec.json @@ -0,0 +1,150 @@ +{ + "version" : "5.5.0", + "timestamp" : 1582741261090, + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (V0 ARRAY) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='JSON_SR');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` ARRAY", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` ARRAY", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` STRING KEY, `V0` ARRAY" + }, + "selectExpressions" : [ "V0 AS V0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT> NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT> NOT NULL" + }, + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent7938322338186257046", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_array_element_OK_-_JSON_SR/5.5.0_1582741261090/topology b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_array_element_OK_-_JSON_SR/5.5.0_1582741261090/topology new file mode 100644 index 000000000000..a70a4e91e301 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_array_element_OK_-_JSON_SR/5.5.0_1582741261090/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_bigint_elements_OK_-_JSON_SR/5.5.0_1582741260793/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_bigint_elements_OK_-_JSON_SR/5.5.0_1582741260793/spec.json new file mode 100644 index 000000000000..0c8794e3b945 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_bigint_elements_OK_-_JSON_SR/5.5.0_1582741260793/spec.json @@ -0,0 +1,150 @@ +{ + "version" : "5.5.0", + "timestamp" : 1582741260793, + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (V0 BIGINT) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='JSON_SR');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` BIGINT", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` BIGINT", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` STRING KEY, `V0` BIGINT" + }, + "selectExpressions" : [ "V0 AS V0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent7938322338186257046", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_bigint_elements_OK_-_JSON_SR/5.5.0_1582741260793/topology b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_bigint_elements_OK_-_JSON_SR/5.5.0_1582741260793/topology new file mode 100644 index 000000000000..a70a4e91e301 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_bigint_elements_OK_-_JSON_SR/5.5.0_1582741260793/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_boolean_elements_OK_-_JSON_SR/5.5.0_1582741260577/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_boolean_elements_OK_-_JSON_SR/5.5.0_1582741260577/spec.json new file mode 100644 index 000000000000..9a4f2ec57d20 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_boolean_elements_OK_-_JSON_SR/5.5.0_1582741260577/spec.json @@ -0,0 +1,150 @@ +{ + "version" : "5.5.0", + "timestamp" : 1582741260577, + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (V0 BOOLEAN) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='JSON_SR');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` BOOLEAN", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` BOOLEAN", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` STRING KEY, `V0` BOOLEAN" + }, + "selectExpressions" : [ "V0 AS V0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent7938322338186257046", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_boolean_elements_OK_-_JSON_SR/5.5.0_1582741260577/topology b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_boolean_elements_OK_-_JSON_SR/5.5.0_1582741260577/topology new file mode 100644 index 000000000000..a70a4e91e301 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_boolean_elements_OK_-_JSON_SR/5.5.0_1582741260577/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_decimal_elements_OK_-_JSON/5.5.0_1582741260923/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_decimal_elements_OK_-_JSON/5.5.0_1582741260923/spec.json new file mode 100644 index 000000000000..6b4a54107a59 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_decimal_elements_OK_-_JSON/5.5.0_1582741260923/spec.json @@ -0,0 +1,150 @@ +{ + "version" : "5.5.0", + "timestamp" : 1582741260923, + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (V0 DECIMAL(15, 14)) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` DECIMAL(15, 14)", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` DECIMAL(15, 14)", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` STRING KEY, `V0` DECIMAL(15, 14)" + }, + "selectExpressions" : [ "V0 AS V0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent7938322338186257046", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_decimal_elements_OK_-_JSON/5.5.0_1582741260923/topology b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_decimal_elements_OK_-_JSON/5.5.0_1582741260923/topology new file mode 100644 index 000000000000..a70a4e91e301 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_decimal_elements_OK_-_JSON/5.5.0_1582741260923/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_decimal_elements_OK_-_JSON_SR/5.5.0_1582741260934/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_decimal_elements_OK_-_JSON_SR/5.5.0_1582741260934/spec.json new file mode 100644 index 000000000000..5d855ad47e22 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_decimal_elements_OK_-_JSON_SR/5.5.0_1582741260934/spec.json @@ -0,0 +1,150 @@ +{ + "version" : "5.5.0", + "timestamp" : 1582741260934, + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (V0 DECIMAL(15, 14)) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='JSON_SR');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` DECIMAL(15, 14)", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` DECIMAL(15, 14)", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` STRING KEY, `V0` DECIMAL(15, 14)" + }, + "selectExpressions" : [ "V0 AS V0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent7938322338186257046", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_decimal_elements_OK_-_JSON_SR/5.5.0_1582741260934/topology b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_decimal_elements_OK_-_JSON_SR/5.5.0_1582741260934/topology new file mode 100644 index 000000000000..a70a4e91e301 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_decimal_elements_OK_-_JSON_SR/5.5.0_1582741260934/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_double_elements_OK_-_JSON_SR/5.5.0_1582741260876/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_double_elements_OK_-_JSON_SR/5.5.0_1582741260876/spec.json new file mode 100644 index 000000000000..2ce61d1b3a72 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_double_elements_OK_-_JSON_SR/5.5.0_1582741260876/spec.json @@ -0,0 +1,150 @@ +{ + "version" : "5.5.0", + "timestamp" : 1582741260876, + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (V0 DOUBLE) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='JSON_SR');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` DOUBLE", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` DOUBLE", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` STRING KEY, `V0` DOUBLE" + }, + "selectExpressions" : [ "V0 AS V0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent7938322338186257046", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_double_elements_OK_-_JSON_SR/5.5.0_1582741260876/topology b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_double_elements_OK_-_JSON_SR/5.5.0_1582741260876/topology new file mode 100644 index 000000000000..a70a4e91e301 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_double_elements_OK_-_JSON_SR/5.5.0_1582741260876/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_int_elements_OK_-_JSON_SR/5.5.0_1582741260696/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_int_elements_OK_-_JSON_SR/5.5.0_1582741260696/spec.json new file mode 100644 index 000000000000..ad105f552117 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_int_elements_OK_-_JSON_SR/5.5.0_1582741260696/spec.json @@ -0,0 +1,150 @@ +{ + "version" : "5.5.0", + "timestamp" : 1582741260696, + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (V0 INTEGER) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='JSON_SR');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` INTEGER", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` INTEGER", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` STRING KEY, `V0` INTEGER" + }, + "selectExpressions" : [ "V0 AS V0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent7938322338186257046", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_int_elements_OK_-_JSON_SR/5.5.0_1582741260696/topology b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_int_elements_OK_-_JSON_SR/5.5.0_1582741260696/topology new file mode 100644 index 000000000000..a70a4e91e301 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_int_elements_OK_-_JSON_SR/5.5.0_1582741260696/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_map_element_OK_-_JSON_SR/5.5.0_1582741261149/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_map_element_OK_-_JSON_SR/5.5.0_1582741261149/spec.json new file mode 100644 index 000000000000..891afa470c75 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_map_element_OK_-_JSON_SR/5.5.0_1582741261149/spec.json @@ -0,0 +1,150 @@ +{ + "version" : "5.5.0", + "timestamp" : 1582741261149, + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (V0 MAP) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='JSON_SR');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` MAP", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` MAP", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` STRING KEY, `V0` MAP" + }, + "selectExpressions" : [ "V0 AS V0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT> NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT> NOT NULL" + }, + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent7938322338186257046", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_map_element_OK_-_JSON_SR/5.5.0_1582741261149/topology b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_map_element_OK_-_JSON_SR/5.5.0_1582741261149/topology new file mode 100644 index 000000000000..a70a4e91e301 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_map_element_OK_-_JSON_SR/5.5.0_1582741261149/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_string_elements_OK_-_JSON_SR/5.5.0_1582741261028/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_string_elements_OK_-_JSON_SR/5.5.0_1582741261028/spec.json new file mode 100644 index 000000000000..5121743ec24d --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_string_elements_OK_-_JSON_SR/5.5.0_1582741261028/spec.json @@ -0,0 +1,150 @@ +{ + "version" : "5.5.0", + "timestamp" : 1582741261028, + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (V0 STRING) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='JSON_SR');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` STRING", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` STRING", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` STRING KEY, `V0` STRING" + }, + "selectExpressions" : [ "V0 AS V0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent7938322338186257046", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_string_elements_OK_-_JSON_SR/5.5.0_1582741261028/topology b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_string_elements_OK_-_JSON_SR/5.5.0_1582741261028/topology new file mode 100644 index 000000000000..a70a4e91e301 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_string_elements_OK_-_JSON_SR/5.5.0_1582741261028/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_struct_element_OK_-_JSON_SR/5.5.0_1582741261211/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_struct_element_OK_-_JSON_SR/5.5.0_1582741261211/spec.json new file mode 100644 index 000000000000..cf01c39d01d6 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_struct_element_OK_-_JSON_SR/5.5.0_1582741261211/spec.json @@ -0,0 +1,150 @@ +{ + "version" : "5.5.0", + "timestamp" : 1582741261211, + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (V0 STRUCT) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='JSON_SR');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` STRUCT<`F0` STRING, `F1` INTEGER>", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `V0` STRUCT<`F0` STRING, `F1` INTEGER>", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` STRING KEY, `V0` STRUCT<`F0` STRING, `F1` INTEGER>" + }, + "selectExpressions" : [ "V0 AS V0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT> NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT> NOT NULL" + }, + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent7938322338186257046", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_struct_element_OK_-_JSON_SR/5.5.0_1582741261211/topology b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_struct_element_OK_-_JSON_SR/5.5.0_1582741261211/topology new file mode 100644 index 000000000000..a70a4e91e301 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_struct_element_OK_-_JSON_SR/5.5.0_1582741261211/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/5.5.0_1582223958838/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/5.5.0_1582223958838/spec.json new file mode 100644 index 000000000000..48c86a55eb9a --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/5.5.0_1582223958838/spec.json @@ -0,0 +1,150 @@ +{ + "version" : "5.5.0", + "timestamp" : 1582223958838, + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (C1 BIGINT) WITH (AVRO_SCHEMA_ID=1, KAFKA_TOPIC='input', VALUE_FORMAT='JSON_SR');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` STRING KEY, `C1` BIGINT", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT WITH (PARTITIONS=4) AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `C1` BIGINT", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` STRING KEY, `C1` BIGINT" + }, + "selectExpressions" : [ "C1 AS C1" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent7850299614276031628", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/5.5.0_1582223958838/topology b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/5.5.0_1582223958838/topology new file mode 100644 index 000000000000..a70a4e91e301 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_JSON_SR_SCHEMA/5.5.0_1582223958838/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SR_SCHEMA/5.5.0_1582746701599/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SR_SCHEMA/5.5.0_1582746701599/spec.json new file mode 100644 index 000000000000..2732454345cd --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SR_SCHEMA/5.5.0_1582746701599/spec.json @@ -0,0 +1,150 @@ +{ + "version" : "5.5.0", + "timestamp" : 1582746701599, + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ROWKEY INTEGER KEY, C1 BIGINT) WITH (AVRO_SCHEMA_ID=1, KAFKA_TOPIC='input', VALUE_FORMAT='JSON_SR');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` INTEGER KEY, `C1` BIGINT", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` INTEGER KEY, `C1` BIGINT", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` INTEGER KEY, `C1` BIGINT" + }, + "selectExpressions" : [ "C1 AS C1" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON_SR", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent5087931939232887546", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SR_SCHEMA/5.5.0_1582746701599/topology b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SR_SCHEMA/5.5.0_1582746701599/topology new file mode 100644 index 000000000000..a70a4e91e301 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SR_SCHEMA/5.5.0_1582746701599/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json b/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json index b4017bcbe244..01fb49ba03df 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json @@ -184,6 +184,31 @@ ] } }, + { + "name": "validate without value elements OK - JSON_SR SCHEMA", + "statements": [ + "CREATE STREAM INPUT (rowkey int key) WITH (kafka_topic='input', value_format='JSON_SR');", + "CREATE STREAM OUTPUT AS SELECT * FROM input;" + ], + "topics": [ + { + "name": "input", + "schema": {"type": "object","properties": {"c1": {"type": "integer"}}}, + "format": "JSON" + }, + { + "name": "OUTPUT", + "format": "JSON" + } + ], + "inputs": [{"topic": "input", "key": 42, "value": {"c1": 4}}], + "outputs": [{"topic": "OUTPUT", "key": 42, "value": {"C1": 4}}], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "ROWKEY INT KEY, `C1` BIGINT"} + ] + } + }, { "name": "validate with elements OK", "format": ["JSON", "PROTOBUF"], @@ -258,7 +283,7 @@ }, { "name": "validate boolean elements OK", - "format": ["JSON", "AVRO", "PROTOBUF"], + "format": ["JSON", "JSON_SR", "AVRO", "PROTOBUF"], "statements": [ "CREATE STREAM INPUT (V0 BOOLEAN) WITH (kafka_topic='input', value_format='{FORMAT}');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" @@ -288,7 +313,7 @@ }, { "name": "validate int elements OK", - "format": ["JSON", "AVRO", "PROTOBUF"], + "format": ["JSON", "JSON_SR", "AVRO", "PROTOBUF"], "statements": [ "CREATE STREAM INPUT (V0 INT) WITH (kafka_topic='input', value_format='{FORMAT}');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" @@ -318,7 +343,7 @@ }, { "name": "validate bigint elements OK", - "format": ["JSON", "AVRO", "PROTOBUF"], + "format": ["JSON", "JSON_SR", "AVRO", "PROTOBUF"], "statements": [ "CREATE STREAM INPUT (V0 BIGINT) WITH (kafka_topic='input', value_format='{FORMAT}');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" @@ -348,7 +373,7 @@ }, { "name": "validate double elements OK", - "format": ["JSON", "AVRO", "PROTOBUF"], + "format": ["JSON", "JSON_SR", "AVRO", "PROTOBUF"], "statements": [ "CREATE STREAM INPUT (V0 DOUBLE) WITH (kafka_topic='input', value_format='{FORMAT}');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" @@ -356,6 +381,16 @@ "inputs": [{"topic": "input", "value": {"V0": 10.1}}], "outputs": [{"topic": "OUTPUT", "value": {"V0": 10.1}}] }, + { + "name": "validate decimal elements OK", + "format": ["JSON", "JSON_SR"], + "statements": [ + "CREATE STREAM INPUT (V0 DECIMAL(15,14)) WITH (kafka_topic='input', value_format='{FORMAT}');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [{"topic": "input", "value": {"V0": 1.12345678901234}}], + "outputs": [{"topic": "OUTPUT", "value": {"V0": 1.12345678901234}}] + }, { "name": "validate string elements OK", "format": ["DELIMITED", "KAFKA"], @@ -368,7 +403,7 @@ }, { "name": "validate string elements OK", - "format": ["JSON", "AVRO", "PROTOBUF"], + "format": ["JSON", "JSON_SR", "AVRO", "PROTOBUF"], "statements": [ "CREATE STREAM INPUT (V0 STRING) WITH (kafka_topic='input', value_format='{FORMAT}');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" @@ -401,7 +436,7 @@ }, { "name": "validate array element OK", - "format": ["JSON", "AVRO", "PROTOBUF"], + "format": ["JSON", "JSON_SR", "AVRO", "PROTOBUF"], "statements": [ "CREATE STREAM INPUT (V0 ARRAY) WITH (kafka_topic='input', value_format='{FORMAT}');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" @@ -467,7 +502,7 @@ }, { "name": "validate struct element OK", - "format": ["JSON", "AVRO", "PROTOBUF"], + "format": ["JSON", "JSON_SR", "AVRO", "PROTOBUF"], "statements": [ "CREATE STREAM INPUT (V0 STRUCT) WITH (kafka_topic='input', value_format='{FORMAT}');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/FormatFactory.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/FormatFactory.java index 18c5f793ad97..5954e24fb645 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/FormatFactory.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/FormatFactory.java @@ -18,6 +18,7 @@ import io.confluent.ksql.serde.avro.AvroFormat; import io.confluent.ksql.serde.delimited.DelimitedFormat; import io.confluent.ksql.serde.json.JsonFormat; +import io.confluent.ksql.serde.json.JsonSchemaFormat; import io.confluent.ksql.serde.kafka.KafkaFormat; import io.confluent.ksql.serde.protobuf.ProtobufFormat; import io.confluent.ksql.util.KsqlException; @@ -29,6 +30,7 @@ public final class FormatFactory { public static final Format AVRO = new AvroFormat(); public static final Format JSON = new JsonFormat(); + public static final Format JSON_SR = new JsonSchemaFormat(); public static final Format PROTOBUF = new ProtobufFormat(); public static final Format KAFKA = new KafkaFormat(); public static final Format DELIMITED = new DelimitedFormat(); @@ -52,6 +54,7 @@ public static Format fromName(final String name) { switch (name) { case AvroFormat.NAME: return AVRO; case JsonFormat.NAME: return JSON; + case JsonSchemaFormat.NAME: return JSON_SR; case ProtobufFormat.NAME: return PROTOBUF; case KafkaFormat.NAME: return KAFKA; case DelimitedFormat.NAME: return DELIMITED; diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/json/JsonFormat.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/json/JsonFormat.java index 8a5a9b6803d9..c0dd51f303ef 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/json/JsonFormat.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/json/JsonFormat.java @@ -55,6 +55,6 @@ public ParsedSchema toParsedSchema(final Schema schema) { @Override public KsqlSerdeFactory getSerdeFactory(final FormatInfo info) { - return new KsqlJsonSerdeFactory(); + return new KsqlJsonSerdeFactory(false); } } diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/json/JsonSchemaFormat.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/json/JsonSchemaFormat.java new file mode 100644 index 000000000000..40c8034f34ed --- /dev/null +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/json/JsonSchemaFormat.java @@ -0,0 +1,60 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.serde.json; + +import io.confluent.connect.json.JsonSchemaData; +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.json.JsonSchema; +import io.confluent.ksql.serde.Format; +import io.confluent.ksql.serde.FormatInfo; +import io.confluent.ksql.serde.KsqlSerdeFactory; +import org.apache.kafka.connect.data.Schema; + +public class JsonSchemaFormat implements Format { + + public static final String NAME = "JSON_SR"; + + private final JsonSchemaData jsonData; + + public JsonSchemaFormat() { + this.jsonData = new JsonSchemaData(); + } + + @Override + public String name() { + return NAME; + } + + @Override + public boolean supportsSchemaInference() { + return true; + } + + @Override + public Schema toConnectSchema(final ParsedSchema schema) { + return jsonData.toConnectSchema((JsonSchema) schema); + } + + @Override + public ParsedSchema toParsedSchema(final Schema schema) { + return jsonData.fromConnectSchema(schema); + } + + @Override + public KsqlSerdeFactory getSerdeFactory(final FormatInfo info) { + return new KsqlJsonSerdeFactory(true); + } +} diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/json/JsonSerdeUtils.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/json/JsonSerdeUtils.java index 6563107d6e55..07ffb51c05cc 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/json/JsonSerdeUtils.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/json/JsonSerdeUtils.java @@ -16,6 +16,7 @@ package io.confluent.ksql.serde.json; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.BooleanNode; import com.fasterxml.jackson.databind.node.NumericNode; import com.fasterxml.jackson.databind.node.TextNode; @@ -23,14 +24,64 @@ import io.confluent.ksql.schema.connect.SchemaWalker.Visitor; import io.confluent.ksql.schema.ksql.PersistenceSchema; import io.confluent.ksql.schema.ksql.SqlBaseType; +import io.confluent.ksql.util.KsqlException; +import java.io.IOException; +import java.io.InputStream; +import javax.annotation.Nonnull; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema.Type; -final class JsonSerdeUtils { +public final class JsonSerdeUtils { + + // the JsonSchemaConverter adds a magic NULL byte and 4 bytes for the + // schema ID at the start of the message + private static final int SIZE_OF_SR_PREFIX = Byte.BYTES + Integer.BYTES; private JsonSerdeUtils() { } + /** + * Converts {@code jsonWithMagic} into an {@link InputStream} that represents + * standard JSON encoding. + * + * @param jsonWithMagic the serialized JSON + * @return the corresponding input stream + * @throws io.confluent.ksql.util.KsqlException If the input is not encoded + * using the schema registry format (first byte magic byte, then + * four bytes for the schemaID). + */ + public static T readJsonSR( + @Nonnull final byte[] jsonWithMagic, + final ObjectMapper mapper, + final Class clazz + ) throws IOException { + if (!hasMagicByte(jsonWithMagic)) { + // don't log contents of jsonWithMagic to avoid leaking data into the logs + throw new KsqlException( + "Got unexpected JSON serialization format that did not start with the magic byte. If " + + "this stream was not serialized using the JsonSchemaConverter, then make sure " + + "the stream is declared with JSON format (not JSON_SR)."); + } + + return mapper.readValue( + jsonWithMagic, + SIZE_OF_SR_PREFIX, + jsonWithMagic.length - SIZE_OF_SR_PREFIX, + clazz + ); + } + + /** + * @param json the serialized JSON + * @return whether or not this JSON contains the magic schema registry byte + */ + static boolean hasMagicByte(@Nonnull final byte[] json) { + // (https://tools.ietf.org/html/rfc7159#section-2) valid JSON should not + // start with 0x00 - the only "insignificant" characters allowed are + // 0x20, 0x09, 0x0A and 0x0D + return json.length > 0 && json[0] == 0x00; + } + static PersistenceSchema validateSchema(final PersistenceSchema schema) { class SchemaValidator implements Visitor { diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java index bdebce895ce8..ca48223b72a0 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java @@ -75,12 +75,15 @@ public class KsqlJsonDeserializer implements Deserializer { .build(); private final PersistenceSchema physicalSchema; + private final boolean isJsonSchema; private String target = "?"; public KsqlJsonDeserializer( - final PersistenceSchema physicalSchema + final PersistenceSchema physicalSchema, + final boolean isJsonSchema ) { this.physicalSchema = JsonSerdeUtils.validateSchema(physicalSchema); + this.isJsonSchema = isJsonSchema; } @Override @@ -91,8 +94,15 @@ public void configure(final Map map, final boolean isKey) { @Override public Object deserialize(final String topic, final byte[] bytes) { try { - final JsonNode value = bytes == null - ? null + if (bytes == null) { + return null; + } + + // don't use the JsonSchemaConverter to read this data because + // we require that the MAPPER enables USE_BIG_DECIMAL_FOR_FLOATS, + // which is not currently available in the standard converters + final JsonNode value = isJsonSchema + ? JsonSerdeUtils.readJsonSR(bytes, MAPPER, JsonNode.class) : MAPPER.readTree(bytes); final Object coerced = enforceFieldType( diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactory.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactory.java index 36fc3a1aadaf..da790e7205e6 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactory.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerdeFactory.java @@ -15,18 +15,40 @@ package io.confluent.ksql.serde.json; +import com.google.common.collect.ImmutableMap; import com.google.errorprone.annotations.Immutable; +import io.confluent.connect.json.JsonSchemaConverter; +import io.confluent.connect.json.JsonSchemaConverterConfig; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.schema.ksql.PersistenceSchema; import io.confluent.ksql.serde.KsqlSerdeFactory; +import io.confluent.ksql.serde.connect.ConnectDataTranslator; +import io.confluent.ksql.serde.connect.KsqlConnectSerializer; +import io.confluent.ksql.serde.tls.ThreadLocalSerializer; import io.confluent.ksql.util.KsqlConfig; +import java.util.Map; import java.util.function.Supplier; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.connect.json.DecimalFormat; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.storage.Converter; @Immutable public class KsqlJsonSerdeFactory implements KsqlSerdeFactory { + private final boolean useSchemaRegistryFormat; + + /** + * @param useSchemaRegistryFormat whether or not to require the magic byte and + * schemaID as part of the JSON message + */ + public KsqlJsonSerdeFactory(final boolean useSchemaRegistryFormat) { + this.useSchemaRegistryFormat = useSchemaRegistryFormat; + } + @Override public void validate(final PersistenceSchema schema) { // Supports all types @@ -38,9 +60,62 @@ public Serde createSerde( final KsqlConfig ksqlConfig, final Supplier schemaRegistryClientFactory ) { + final Supplier> serializer = () -> createSerializer( + schema, + ksqlConfig, + schemaRegistryClientFactory + ); + + // Sanity check: + serializer.get(); + return Serdes.serdeFrom( - new KsqlJsonSerializer(schema), - new KsqlJsonDeserializer(schema) + new ThreadLocalSerializer<>(serializer), + new KsqlJsonDeserializer(schema, useSchemaRegistryFormat) ); } + + private KsqlConnectSerializer createSerializer( + final PersistenceSchema schema, + final KsqlConfig ksqlConfig, + final Supplier schemaRegistryClientFactory + ) { + final Converter converter = useSchemaRegistryFormat + ? getSchemaConverter(schemaRegistryClientFactory.get(), ksqlConfig) + : getConverter(); + + return new KsqlConnectSerializer( + schema.serializedSchema(), + new ConnectDataTranslator(schema.serializedSchema()), + converter + ); + } + + private Converter getConverter() { + final JsonConverter converter = new JsonConverter(); + converter.configure(ImmutableMap.of( + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false, + JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name() + ), false); + return converter; + } + + private Converter getSchemaConverter( + final SchemaRegistryClient schemaRegistryClient, + final KsqlConfig ksqlConfig + ) { + final Map config = ksqlConfig + .originalsWithPrefix(KsqlConfig.KSQL_SCHEMA_REGISTRY_PREFIX); + + config.put( + JsonSchemaConverterConfig.SCHEMA_REGISTRY_URL_CONFIG, + ksqlConfig.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY) + ); + config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name()); + + final Converter converter = new JsonSchemaConverter(schemaRegistryClient); + converter.configure(config, false); + + return converter; + } } diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerializer.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerializer.java deleted file mode 100644 index c46eacfb014b..000000000000 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonSerializer.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright 2018 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.serde.json; - -import com.google.common.collect.ImmutableMap; -import io.confluent.ksql.schema.ksql.PersistenceSchema; -import java.util.Map; -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.connect.json.DecimalFormat; -import org.apache.kafka.connect.json.JsonConverter; -import org.apache.kafka.connect.json.JsonConverterConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class KsqlJsonSerializer implements Serializer { - - private static final Logger LOG = LoggerFactory.getLogger(KsqlJsonSerializer.class); - - private final PersistenceSchema physicalSchema; - private final JsonConverter converter; - - public KsqlJsonSerializer(final PersistenceSchema physicalSchema) { - this.converter = new JsonConverter(); - this.converter.configure(ImmutableMap.of( - JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false, - JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name() - ), false); - this.physicalSchema = JsonSerdeUtils.validateSchema(physicalSchema); - } - - @Override - public void configure(final Map props, final boolean isKey) { - } - - @Override - public byte[] serialize(final String topic, final Object data) { - if (LOG.isTraceEnabled()) { - LOG.trace("Serializing row. topic:{}, row:{}", topic, data); - } - - if (data == null) { - return null; - } - - try { - return converter.fromConnectData(topic, physicalSchema.serializedSchema(), data); - } catch (final Exception e) { - throw new SerializationException("Error serializing JSON message for topic: " + topic, e); - } - } - - @Override - public void close() { - } -} \ No newline at end of file diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/json/JsonSerdeUtilsTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/json/JsonSerdeUtilsTest.java index b8b35629c32f..b35cee56cbfd 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/json/JsonSerdeUtilsTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/json/JsonSerdeUtilsTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.BooleanNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import io.confluent.ksql.schema.ksql.PersistenceSchema; @@ -34,12 +35,20 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +@RunWith(MockitoJUnitRunner.class) public class JsonSerdeUtilsTest { @Rule public final ExpectedException expectedException = ExpectedException.none(); + @Mock + private ObjectMapper mapper; + @Test public void shouldConvertToBooleanCorrectly() { final Boolean b = JsonSerdeUtils.toBoolean(BooleanNode.TRUE); @@ -237,6 +246,30 @@ public void shouldThrowOnNestedMapWithNoneStringKeys() { )); } + @Test + public void shouldSetCorrectOffsetWithMagicByte() throws IOException { + // Given: + byte[] json = new byte[]{/* magic */ 0x00, /* id */ 0x00, 0x00, 0x00, 0x01, /* data */ 0x01}; + + // When: + JsonSerdeUtils.readJsonSR(json, mapper, Object.class); + + // Then: + Mockito.verify(mapper, Mockito.times(1)).readValue(json, 5, 1, Object.class); + } + + @Test() + public void shouldThrowOnStandardJsonConversion() throws IOException { + // Given: + byte[] json = new byte[]{/* data */ 0x01}; + + // Expect: + expectedException.expectMessage("Got unexpected JSON serialization format that did not start with the magic byte"); + + // When: + JsonSerdeUtils.readJsonSR(json, mapper, Object.class); + } + private static PersistenceSchema persistenceSchema(final Schema schema) { return PersistenceSchema.from( (ConnectSchema) SchemaBuilder.struct() diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java index 7117b6a1e0ed..f285d622ff5e 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java @@ -27,6 +27,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.BooleanNode; +import com.fasterxml.jackson.databind.node.IntNode; +import com.fasterxml.jackson.databind.node.TextNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.schema.ksql.PersistenceSchema; @@ -34,6 +37,7 @@ import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -51,9 +55,11 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; -import org.mockito.junit.MockitoJUnitRunner; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; -@RunWith(MockitoJUnitRunner.class) +@RunWith(Parameterized.class) public class KsqlJsonDeserializerTest { private static final String SOME_TOPIC = "bob"; @@ -97,6 +103,14 @@ public class KsqlJsonDeserializerTest { @Rule public final ExpectedException expectedException = ExpectedException.none(); + @Parameters + public static Collection data() { + return Arrays.asList(new Object[][]{{false}, {true}}); + } + + @Parameter + public boolean useSchemas; + private Struct expectedOrder; private PersistenceSchema persistenceSchema; private KsqlJsonDeserializer deserializer; @@ -161,7 +175,7 @@ public void shouldCoerceFieldValues() { @Test public void shouldThrowIfNotAnObject() { // Given: - final byte[] bytes = "true".getBytes(StandardCharsets.UTF_8); + final byte[] bytes = serializeJson(BooleanNode.valueOf(true)); // Then: expectedException.expect(SerializationException.class); @@ -279,7 +293,7 @@ public void shouldCreateJsonStringForStructIfDefinedAsVarchar() { + "}").getBytes(StandardCharsets.UTF_8); // When: - final Struct result = (Struct) deserializer.deserialize(SOME_TOPIC, bytes); + final Struct result = (Struct) deserializer.deserialize(SOME_TOPIC, addMagic(bytes)); // Then: assertThat(result.schema(), is(persistenceSchema.ksqlSchema())); @@ -292,7 +306,7 @@ public void shouldDeserializedJsonBoolean() { // Given: givenDeserializerForSchema(Schema.OPTIONAL_BOOLEAN_SCHEMA); - final byte[] bytes = "true".getBytes(StandardCharsets.UTF_8); + final byte[] bytes = serializeJson(BooleanNode.valueOf(true)); // When: final Object result = deserializer.deserialize(SOME_TOPIC, bytes); @@ -306,7 +320,7 @@ public void shouldThrowIfCanNotCoerceToBoolean() { // Given: givenDeserializerForSchema(Schema.OPTIONAL_BOOLEAN_SCHEMA); - final byte[] bytes = "24".getBytes(StandardCharsets.UTF_8); + final byte[] bytes = serializeJson(IntNode.valueOf(23)); // Then: expectedException.expect(SerializationException.class); @@ -330,7 +344,7 @@ public void shouldDeserializedJsonNumberAsInt() { validCoercions.forEach(value -> { - final byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + final byte[] bytes = addMagic(value.getBytes(StandardCharsets.UTF_8)); // When: final Object result = deserializer.deserialize(SOME_TOPIC, bytes); @@ -345,7 +359,7 @@ public void shouldThrowIfCanNotCoerceToInt() { // Given: givenDeserializerForSchema(Schema.OPTIONAL_INT32_SCHEMA); - final byte[] bytes = "true".getBytes(StandardCharsets.UTF_8); + final byte[] bytes = serializeJson(BooleanNode.valueOf(true)); // Then: expectedException.expect(SerializationException.class); @@ -369,7 +383,7 @@ public void shouldDeserializedJsonNumberAsBigInt() { validCoercions.forEach(value -> { - final byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + final byte[] bytes = addMagic(value.getBytes(StandardCharsets.UTF_8)); // When: final Object result = deserializer.deserialize(SOME_TOPIC, bytes); @@ -385,7 +399,7 @@ public void shouldThrowIfCanNotCoerceToBigInt() { // Given: givenDeserializerForSchema(Schema.OPTIONAL_INT64_SCHEMA); - final byte[] bytes = "true".getBytes(StandardCharsets.UTF_8); + final byte[] bytes = serializeJson(BooleanNode.valueOf(true)); // Then: expectedException.expect(SerializationException.class); @@ -410,7 +424,7 @@ public void shouldDeserializedJsonNumberAsDouble() { validCoercions.forEach(value -> { - final byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + final byte[] bytes = addMagic(value.getBytes(StandardCharsets.UTF_8)); // When: final Object result = deserializer.deserialize(SOME_TOPIC, bytes); @@ -425,7 +439,7 @@ public void shouldThrowIfCanNotCoerceToDouble() { // Given: givenDeserializerForSchema(Schema.OPTIONAL_FLOAT64_SCHEMA); - final byte[] bytes = "true".getBytes(StandardCharsets.UTF_8); + final byte[] bytes = serializeJson(BooleanNode.valueOf(true)); // Then: expectedException.expect(SerializationException.class); @@ -453,7 +467,7 @@ public void shouldDeserializedJsonText() { validCoercions.forEach((jsonValue, expectedValue) -> { - final byte[] bytes = jsonValue.getBytes(StandardCharsets.UTF_8); + final byte[] bytes = addMagic(jsonValue.getBytes(StandardCharsets.UTF_8)); // When: final Object result = deserializer.deserialize(SOME_TOPIC, bytes); @@ -475,7 +489,7 @@ public void shouldDeserializedJsonNumberAsBigDecimal() { validCoercions.forEach(value -> { - final byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + final byte[] bytes = addMagic(value.getBytes(StandardCharsets.UTF_8)); // When: final Object result = deserializer.deserialize(SOME_TOPIC, bytes); @@ -490,7 +504,7 @@ public void shouldThrowIfCanNotCoerceToBigDecimal() { // Given: givenDeserializerForSchema(DecimalUtil.builder(20, 19).build()); - final byte[] bytes = "true".getBytes(StandardCharsets.UTF_8); + final byte[] bytes = serializeJson(BooleanNode.valueOf(true)); // Then: expectedException.expect(SerializationException.class); @@ -526,7 +540,7 @@ public void shouldThrowIfNotAnArray() { .build() ); - final byte[] bytes = "true".getBytes(StandardCharsets.UTF_8); + final byte[] bytes = serializeJson(BooleanNode.valueOf(true)); // Then: expectedException.expect(SerializationException.class); @@ -583,7 +597,7 @@ public void shouldThrowIfNotAnMap() { .build() ); - final byte[] bytes = "true".getBytes(StandardCharsets.UTF_8); + final byte[] bytes = serializeJson(BooleanNode.valueOf(true)); // Then: expectedException.expect(SerializationException.class); @@ -632,7 +646,7 @@ public void shouldThrowOnMapSchemaWithNonStringKeys() { expectedException.expectMessage("Only MAPs with STRING keys are supported"); // When: - new KsqlJsonDeserializer(physicalSchema); + new KsqlJsonDeserializer(physicalSchema, false); } @Test @@ -657,7 +671,7 @@ public void shouldThrowOnNestedMapSchemaWithNonStringKeys() { expectedException.expectMessage("Only MAPs with STRING keys are supported"); // When: - new KsqlJsonDeserializer(physicalSchema); + new KsqlJsonDeserializer(physicalSchema, false); } @Test @@ -698,7 +712,7 @@ public void shouldIncludePathForErrorsInRootNode() { // Given: givenDeserializerForSchema(Schema.OPTIONAL_FLOAT64_SCHEMA); - final byte[] bytes = "true".getBytes(StandardCharsets.UTF_8); + final byte[] bytes = serializeJson(BooleanNode.valueOf(true)); // Then: expectedException.expectCause(hasMessage(endsWith(", path: $"))); @@ -770,14 +784,22 @@ private void givenDeserializerForSchema(final Schema serializedSchema) { this.persistenceSchema = PersistenceSchema .from((ConnectSchema) ksqlSchema, unwrap); - deserializer = new KsqlJsonDeserializer(persistenceSchema); + deserializer = new KsqlJsonDeserializer(persistenceSchema, useSchemas); } - private static byte[] serializeJson(final Object expected) { + private byte[] serializeJson(final Object expected) { try { - return OBJECT_MAPPER.writeValueAsBytes(expected); + return addMagic(OBJECT_MAPPER.writeValueAsBytes(expected)); } catch (final JsonProcessingException e) { throw new RuntimeException(e); } } + + private byte[] addMagic(final byte[] json) { + if (useSchemas) { + return ArrayUtils.addAll(new byte[]{/*magic*/ 0x00, /*schema*/ 0x00, 0x00, 0x00, 0x01}, json); + } else { + return json; + } + } } diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonSerializerTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonSerializerTest.java index 151f9b2afb71..93716d5ad48f 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonSerializerTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonSerializerTest.java @@ -28,16 +28,22 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.schema.ksql.PersistenceSchema; +import io.confluent.ksql.util.DecimalUtil; +import io.confluent.ksql.util.KsqlConfig; import java.io.IOException; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; @@ -50,9 +56,11 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; -import org.mockito.junit.MockitoJUnitRunner; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; -@RunWith(MockitoJUnitRunner.class) +@RunWith(Parameterized.class) public class KsqlJsonSerializerTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -124,10 +132,22 @@ public class KsqlJsonSerializerTest { @Rule public final ExpectedException expectedException = ExpectedException.none(); - private KsqlJsonSerializer serializer; + @Parameters + public static Collection data() { + return Arrays.asList(new Object[][]{{false}, {true}}); + } + + @Parameter + public boolean useSchemas; + + private KsqlConfig config; + private SchemaRegistryClient srClient; + private Serializer serializer; @Before public void before() { + config = new KsqlConfig(ImmutableMap.of()); + srClient = new MockSchemaRegistryClient(); givenSerializerForSchema(ORDER_SCHEMA); } @@ -159,6 +179,10 @@ public void shouldSerializeStructCorrectly() { final byte[] bytes = serializer.serialize(SOME_TOPIC, struct); // Then: + final String mapCol = useSchemas + ? "[{\"key\":\"key1\",\"value\":100.0}]" + : "{\"key1\":100.0}"; + assertThat(asJsonString(bytes), equalTo( "{" + "\"ORDERTIME\":1511897796092," @@ -166,7 +190,7 @@ public void shouldSerializeStructCorrectly() { + "\"ITEMID\":\"item_1\"," + "\"ORDERUNITS\":10.0," + "\"ARRAYCOL\":[100.0]," - + "\"MAPCOL\":{\"key1\":100.0}," + + "\"MAPCOL\":" + mapCol + "," + "\"DECIMALCOL\":1.12345" + "}")); } @@ -192,7 +216,10 @@ public void shouldHandleNestedStruct() throws IOException { final byte[] bytes = serializer.serialize(SOME_TOPIC, struct); // Then: - final JsonNode jsonNode = OBJECT_MAPPER.readTree(bytes); + final JsonNode jsonNode = useSchemas + ? JsonSerdeUtils.readJsonSR(bytes, OBJECT_MAPPER, JsonNode.class) + : OBJECT_MAPPER.readTree(bytes); + assertThat(jsonNode.size(), equalTo(7)); assertThat(jsonNode.get("ordertime").asLong(), is(1234567L)); assertThat(jsonNode.get("itemid").get("NAME").asText(), is("Item_10")); @@ -288,6 +315,18 @@ public void shouldSerializeDouble() { assertThat(asJsonString(bytes), is("62.0")); } + @Test + public void shouldSerializeDecimal() { + // Given: + givenSerializerForSchema(DecimalUtil.builder(20, 19).build()); + + // When: + final byte[] bytes = serializer.serialize(SOME_TOPIC, new BigDecimal("1.234567890123456789")); + + // Then: + assertThat(asJsonString(bytes), is("1.234567890123456789")); + } + @Test public void shouldThrowIfNotDouble() { // Given: @@ -389,7 +428,11 @@ public void shouldSerializeMap() { final byte[] bytes = serializer.serialize(SOME_TOPIC, ImmutableMap.of("a", 1, "b", 2)); // Then: - assertThat(asJsonString(bytes), is("{\"a\":1,\"b\":2}")); + if (useSchemas) { + assertThat(asJsonString(bytes), is("[{\"key\":\"a\",\"value\":1},{\"key\":\"b\",\"value\":2}]")); + } else { + assertThat(asJsonString(bytes), is("{\"a\":1,\"b\":2}")); + } } @Test @@ -462,7 +505,7 @@ public void shouldThrowOnMapSchemaWithNonStringKeys() { expectedException.expectMessage("Only MAPs with STRING keys are supported"); // When: - new KsqlJsonSerializer(physicalSchema); + new KsqlJsonSerdeFactory(false).createSerde(physicalSchema, config, () -> null); } @Test @@ -487,7 +530,7 @@ public void shouldThrowOnNestedMapSchemaWithNonStringKeys() { expectedException.expectMessage("Only MAPs with STRING keys are supported"); // When: - new KsqlJsonSerializer(physicalSchema); + new KsqlJsonSerdeFactory(false).createSerde(physicalSchema, config, () -> null); } @Test @@ -552,8 +595,12 @@ public void shouldNotIncludeBadValueInExceptionAsThatWouldBeASecurityIssue() { } } - private static String asJsonString(final byte[] bytes) { - return new String(bytes, StandardCharsets.UTF_8); + private String asJsonString(final byte[] bytes) { + if (useSchemas) { + return new String(Arrays.copyOfRange(bytes, 5, bytes.length), StandardCharsets.UTF_8); + } else { + return new String(bytes, StandardCharsets.UTF_8); + } } private static Struct buildWithNestedStruct() { @@ -602,6 +649,8 @@ private void givenSerializerForSchema(final Schema serializedSchema) { final PersistenceSchema persistenceSchema = PersistenceSchema .from((ConnectSchema) ksqlSchema, unwrap); - serializer = new KsqlJsonSerializer(persistenceSchema); + serializer = new KsqlJsonSerdeFactory(useSchemas) + .createSerde(persistenceSchema, config, () -> srClient) + .serializer(); } }