Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce JSON_SR format #4596

Merged
merged 4 commits into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.serde.json.JsonSerdeUtils;
import io.confluent.ksql.test.serde.SerdeSupplier;
import java.math.BigDecimal;
import java.util.Collection;
Expand All @@ -31,13 +32,19 @@
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;

public class ValueSpecJsonSerdeSupplier implements SerdeSupplier<Object> {

private static final ObjectMapper MAPPER = new ObjectMapper()
.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
private final boolean useSchemas;

public ValueSpecJsonSerdeSupplier(final boolean useSchemas) {
this.useSchemas = useSchemas;
}

@Override
public Serializer<Object> getSerializer(final SchemaRegistryClient schemaRegistryClient) {
Expand All @@ -49,7 +56,7 @@ public Deserializer<Object> getDeserializer(final SchemaRegistryClient schemaReg
return new ValueSpecJsonDeserializer();
}

private static final class ValueSpecJsonSerializer implements Serializer<Object> {
private final class ValueSpecJsonSerializer implements Serializer<Object> {
@Override
public void close() {
}
Expand All @@ -65,14 +72,21 @@ public byte[] serialize(final String topicName, final Object spec) {
}
try {
final Object toSerialize = Converter.toJsonNode(spec);
return MAPPER.writeValueAsBytes(toSerialize);
final byte[] bytes = MAPPER.writeValueAsBytes(toSerialize);
if (!useSchemas) {
return bytes;
}

return ArrayUtils.addAll(
new byte[]{/*magic*/ 0x00, /*schemaID*/ 0x00, 0x00, 0x00, 0x01},
bytes);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}

private static final class ValueSpecJsonDeserializer implements Deserializer<Object> {
private final class ValueSpecJsonDeserializer implements Deserializer<Object> {
@Override
public void close() {
}
Expand All @@ -87,7 +101,9 @@ public Object deserialize(final String topicName, final byte[] data) {
return null;
}
try {
return MAPPER.readValue(data, Object.class);
return useSchemas
? JsonSerdeUtils.readJsonSR(data, MAPPER, Object.class)
: MAPPER.readValue(data, Object.class);
} catch (final Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.confluent.ksql.serde.avro.AvroFormat;
import io.confluent.ksql.serde.delimited.DelimitedFormat;
import io.confluent.ksql.serde.json.JsonFormat;
import io.confluent.ksql.serde.json.JsonSchemaFormat;
import io.confluent.ksql.serde.kafka.KafkaFormat;
import io.confluent.ksql.serde.protobuf.ProtobufFormat;
import io.confluent.ksql.test.serde.SerdeSupplier;
Expand Down Expand Up @@ -66,7 +67,8 @@ public static SerdeSupplier<?> getSerdeSupplier(
switch (format.name()) {
case AvroFormat.NAME: return new ValueSpecAvroSerdeSupplier();
case ProtobufFormat.NAME: return new ValueSpecProtobufSerdeSupplier();
case JsonFormat.NAME: return new ValueSpecJsonSerdeSupplier();
case JsonFormat.NAME: return new ValueSpecJsonSerdeSupplier(false);
case JsonSchemaFormat.NAME: return new ValueSpecJsonSerdeSupplier(true);
case DelimitedFormat.NAME: return new StringSerdeSupplier();
case KafkaFormat.NAME: return new KafkaSerdeSupplier(schema);
default:
Expand All @@ -89,7 +91,8 @@ public static Optional<ParsedSchema> buildSchema(final JsonNode schema, final St
new AvroFormat()
.toParsedSchema(new AvroData(1).toConnectSchema(avroSchema))
);
} else if (format.equalsIgnoreCase(JsonFormat.NAME)) {
} else if (format.equalsIgnoreCase(JsonFormat.NAME)
|| format.equalsIgnoreCase(JsonSchemaFormat.NAME)) {
final String schemaString = OBJECT_MAPPER.writeValueAsString(schema);
return Optional.of(new JsonSchema(schemaString));
} else if (format.equalsIgnoreCase(ProtobufFormat.NAME)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
{
"version" : "5.5.0",
"timestamp" : 1582741261090,
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (V0 ARRAY<INTEGER>) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='JSON_SR');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ROWKEY` STRING KEY, `V0` ARRAY<INTEGER>",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ROWKEY` STRING KEY, `V0` ARRAY<INTEGER>",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : null,
"sourceSchema" : "`ROWKEY` STRING KEY, `V0` ARRAY<INTEGER>"
},
"selectExpressions" : [ "V0 AS V0" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : { }
},
"options" : [ ]
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<V0 ARRAY<INT>> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<V0 ARRAY<INT>> NOT NULL"
},
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.new.api.enabled" : "false",
"ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent7938322338186257046",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Loading