Skip to content

Commit

Permalink
feat: implement serdes for bytes (#7791)
Browse files Browse the repository at this point in the history
* feat: implement serdes for bytes

* update historical plans

* improve json deserializer error message

* fix build

* clean up a test

* use static base64 de/encoders
  • Loading branch information
Zara Lim authored Jul 16, 2021
1 parent 1235ac0 commit 2ae4cae
Show file tree
Hide file tree
Showing 40 changed files with 1,749 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,20 +279,22 @@ public void shouldIgnoreFixed() {
.nullableString("STRING", "bar")
.endRecord(),
SchemaBuilder.struct()
.field("FIXED_FIELD", Schema.OPTIONAL_BYTES_SCHEMA)
.field("STRING", Schema.OPTIONAL_STRING_SCHEMA)
.optional()
.build()
);
}

@Test
public void shouldIgnoreBytes() {
public void shouldInferBytes() {
shouldInferType(
org.apache.avro.SchemaBuilder.record("foo").fields()
.nullableBytes("bytes", new byte[]{})
.nullableString("STRING", "bar")
.endRecord(),
SchemaBuilder.struct()
.field("BYTES", Schema.OPTIONAL_BYTES_SCHEMA)
.field("STRING", Schema.OPTIONAL_STRING_SCHEMA)
.optional()
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public class SqlToJavaVisitor {
"java.sql.Time",
"java.sql.Date",
"java.sql.Timestamp",
"java.nio.ByteBuffer",
"java.util.Arrays",
"java.util.HashMap",
"java.util.Map",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package io.confluent.ksql.test.serde;

import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
Expand All @@ -25,6 +27,7 @@
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -197,8 +200,9 @@ private Object specToConnect(final Object spec, final Schema schema) {
}

throw new TestFrameworkException("DECIMAL type requires JSON number in test data");
} else {
return spec.toString().getBytes(UTF_8);
}
throw new RuntimeException("Unexpected BYTES type " + schema.name());
default:
throw new RuntimeException(
"This test does not support the data type yet: " + schema.type());
Expand Down Expand Up @@ -283,8 +287,14 @@ private Object connectToSpec(
if (data instanceof BigDecimal) {
return data;
}
throw new RuntimeException("Unexpected BYTES type " + schema.name());
} else {
if (data instanceof byte[]) {
return ByteBuffer.wrap((byte[]) data);
} else {
return data;
}
}
throw new RuntimeException("Unexpected BYTES type " + schema.name());
default:
throw new RuntimeException("Test cannot handle data of type: " + schema.type());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (EXPECTED INTEGER, C1 BYTES, C2 BYTES) WITH (KAFKA_TOPIC='input', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO', VALUE_SCHEMA_ID=1);",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`EXPECTED` INTEGER, `C1` BYTES, `C2` BYTES",
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`EXPECTED` INTEGER, `C1` BYTES, `C2` BYTES",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"sourceSchema" : "`EXPECTED` INTEGER, `C1` BYTES, `C2` BYTES"
},
"selectExpressions" : [ "EXPECTED AS EXPECTED", "C1 AS C1", "C2 AS C2" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.persistence.default.format.key" : "KAFKA",
"ksql.query.persistent.max.bytes.buffering.total" : "-1",
"ksql.queryanonymizer.logs_enabled" : "true",
"ksql.query.error.max.queue.size" : "10",
"ksql.variable.substitution.enable" : "true",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.queryanonymizer.cluster_namespace" : null,
"ksql.query.pull.metrics.enabled" : "true",
"ksql.create.or.replace.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.lambdas.enabled" : "true",
"ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647",
"ksql.suppress.enabled" : "false",
"ksql.query.push.scalable.enabled" : "false",
"ksql.query.push.scalable.interpreter.enabled" : "true",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.persistence.wrap.single.values" : null,
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.query.transient.max.bytes.buffering.total" : "-1",
"ksql.schema.registry.url" : "",
"ksql.properties.overrides.denylist" : "",
"ksql.query.pull.max.concurrent.requests" : "2147483647",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.query.pull.interpreter.enabled" : "true",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.metrics.tags.custom" : "",
"ksql.persistence.default.format.value" : null,
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.nested.error.set.null" : "true",
"ksql.udf.collect.metrics" : "false",
"ksql.query.pull.thread.pool.size" : "100",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
{
"version" : "7.1.0",
"timestamp" : 1626301943478,
"path" : "query-validation-tests/avro.json",
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : {
"schema" : "`EXPECTED` INTEGER, `C1` BYTES, `C2` BYTES",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"CSAS_OUTPUT_0.OUTPUT" : {
"schema" : "`EXPECTED` INTEGER, `C1` BYTES, `C2` BYTES",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
}
},
"testCase" : {
"name" : "should not filter out bytes",
"inputs" : [ {
"topic" : "input",
"key" : null,
"value" : {
"expected" : 1,
"c1" : null,
"c2" : null,
"c3" : null
}
} ],
"outputs" : [ {
"topic" : "OUTPUT",
"key" : null,
"value" : {
"EXPECTED" : 1,
"c1" : null,
"c2" : null
}
} ],
"topics" : [ {
"name" : "input",
"valueSchema" : {
"type" : "record",
"name" : "blah",
"fields" : [ {
"name" : "expected",
"type" : "int"
}, {
"name" : "c1",
"type" : [ "null", "bytes" ]
}, {
"name" : "c2",
"type" : [ "null", {
"type" : "fixed",
"name" : "md5",
"size" : 16
} ]
} ]
},
"valueFormat" : "AVRO",
"replicas" : 1,
"numPartitions" : 1
}, {
"name" : "OUTPUT",
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM INPUT WITH (kafka_topic='input', value_format='AvRo');", "CREATE STREAM OUTPUT AS SELECT * FROM input;" ],
"post" : {
"sources" : [ {
"name" : "INPUT",
"type" : "STREAM",
"schema" : "`EXPECTED` INTEGER, `C1` BYTES, `C2` BYTES",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : "AVRO",
"keyFeatures" : [ ],
"valueFeatures" : [ ]
}, {
"name" : "OUTPUT",
"type" : "STREAM",
"schema" : "`EXPECTED` INTEGER, `C1` BYTES, `C2` BYTES",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : "AVRO",
"keyFeatures" : [ ],
"valueFeatures" : [ ]
} ],
"topics" : {
"topics" : [ {
"name" : "input",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
},
"partitions" : 1,
"valueSchema" : {
"type" : "record",
"name" : "blah",
"fields" : [ {
"name" : "expected",
"type" : "int"
}, {
"name" : "c1",
"type" : [ "null", "bytes" ],
"default" : null
}, {
"name" : "c2",
"type" : [ "null", {
"type" : "fixed",
"name" : "md5",
"size" : 16,
"connect.parameters" : {
"connect.fixed.size" : "16"
},
"connect.name" : "md5"
} ],
"default" : null
} ],
"connect.name" : "blah"
}
}, {
"name" : "OUTPUT",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
},
"partitions" : 4,
"valueSchema" : {
"type" : "record",
"name" : "KsqlDataSourceSchema",
"namespace" : "io.confluent.ksql.avro_schemas",
"fields" : [ {
"name" : "EXPECTED",
"type" : [ "null", "int" ],
"default" : null
}, {
"name" : "C1",
"type" : [ "null", "bytes" ],
"default" : null
}, {
"name" : "C2",
"type" : [ "null", "bytes" ],
"default" : null
} ]
}
} ]
}
}
}
}
Loading

0 comments on commit 2ae4cae

Please sign in to comment.