Skip to content

Commit

Permalink
refactor: logicalSchema (#3315)
Browse files Browse the repository at this point in the history
- internally use `SqlStruct` rather than `List<Field>`
- as this allows us to expose `SqlStruct` schemas for metadata, key and value
- also remove old factor methods that take Connect `Schema`, which were only used in tests. Replace with builder.
  • Loading branch information
big-andy-coates authored Sep 10, 2019
1 parent 8917e48 commit 3e36dc0
Show file tree
Hide file tree
Showing 87 changed files with 1,031 additions and 1,299 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void setUp() throws Exception {

final Pair<Struct, GenericRow> genericRowPair = rowGenerator.generateRow();
row = genericRowPair.getRight();
schema = rowGenerator.schema().valueSchema();
schema = rowGenerator.schema().valueConnectSchema();
}

private InputStream getSchemaStream() {
Expand Down
31 changes: 13 additions & 18 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster;
import io.confluent.ksql.test.util.KsqlIdentifierTestUtil;
Expand All @@ -84,8 +85,6 @@
import kafka.zookeeper.ZooKeeperClientException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpStatus.Code;
Expand Down Expand Up @@ -534,14 +533,11 @@ public void testSelectProject() {
new Double[]{1100.0, 1110.99, 970.0})));

final PhysicalSchema resultSchema = PhysicalSchema.from(
LogicalSchema.of(SchemaBuilder.struct()
.field("ITEMID", SchemaBuilder.OPTIONAL_STRING_SCHEMA)
.field("ORDERUNITS", SchemaBuilder.OPTIONAL_FLOAT64_SCHEMA)
.field("PRICEARRAY", SchemaBuilder
.array(SchemaBuilder.OPTIONAL_FLOAT64_SCHEMA)
.optional()
.build())
.build()),
LogicalSchema.builder()
.valueField("ITEMID", SqlTypes.STRING)
.valueField("ORDERUNITS", SqlTypes.DOUBLE)
.valueField("PRICEARRAY", SqlTypes.array(SqlTypes.DOUBLE))
.build(),
SerdeOption.none()
);

Expand Down Expand Up @@ -640,15 +636,14 @@ public void testSelectUDFs() {
orderDataProvider.kstreamName()
);

final Schema sourceSchema = orderDataProvider.schema().logicalSchema().valueSchema();
final PhysicalSchema resultSchema = PhysicalSchema.from(
LogicalSchema.of(SchemaBuilder.struct()
.field("ITEMID", sourceSchema.field("ITEMID").schema())
.field("COL1", sourceSchema.field("ORDERUNITS").schema())
.field("COL2", sourceSchema.field("PRICEARRAY").schema().valueSchema())
.field("COL3", sourceSchema.field("KEYVALUEMAP").schema().valueSchema())
.field("COL4", SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA)
.build()),
LogicalSchema.builder()
.valueField("ITEMID", SqlTypes.STRING)
.valueField("COL1", SqlTypes.DOUBLE)
.valueField("COL2", SqlTypes.DOUBLE)
.valueField("COL3", SqlTypes.DOUBLE)
.valueField("COL4", SqlTypes.BOOLEAN)
.build(),
SerdeOption.none()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@
import io.confluent.ksql.rest.entity.TypeList;
import io.confluent.ksql.rest.util.EntityUtil;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.LogicalSchema.Builder;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -79,8 +82,6 @@
import java.util.Map;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.ConnectorState;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.TaskState;
Expand Down Expand Up @@ -108,7 +109,7 @@ public class ConsoleTest {
"TestSource",
Collections.emptyList(),
Collections.emptyList(),
buildTestSchema(Schema.OPTIONAL_INT32_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA),
buildTestSchema(SqlTypes.INTEGER, SqlTypes.STRING),
DataSourceType.KTABLE.getKsqlType(),
"key",
"2000-01-01",
Expand Down Expand Up @@ -304,23 +305,15 @@ public void testPrintQueries() throws IOException {
public void testPrintSourceDescription() throws IOException {
// Given:
final List<FieldInfo> fields = buildTestSchema(
Schema.OPTIONAL_BOOLEAN_SCHEMA,
Schema.OPTIONAL_INT32_SCHEMA,
Schema.OPTIONAL_INT64_SCHEMA,
Schema.OPTIONAL_FLOAT64_SCHEMA,
Schema.OPTIONAL_STRING_SCHEMA,
SchemaBuilder
.array(Schema.OPTIONAL_STRING_SCHEMA)
.optional()
.build(),
SchemaBuilder
.map(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA)
.optional()
.build(),
SchemaBuilder
.struct()
.field("a", Schema.OPTIONAL_FLOAT64_SCHEMA)
.optional()
SqlTypes.BOOLEAN,
SqlTypes.INTEGER,
SqlTypes.BIGINT,
SqlTypes.DOUBLE,
SqlTypes.STRING,
SqlTypes.array(SqlTypes.STRING),
SqlTypes.map(SqlTypes.BIGINT),
SqlTypes.struct()
.field("a", SqlTypes.DOUBLE)
.build()
);

Expand Down Expand Up @@ -971,7 +964,7 @@ public void shouldPrintTopicDescribeExtended() throws IOException {
"TestSource",
readQueries,
writeQueries,
buildTestSchema(Schema.OPTIONAL_STRING_SCHEMA),
buildTestSchema(SqlTypes.STRING),
DataSourceType.KTABLE.getKsqlType(),
"key",
"2000-01-01",
Expand Down Expand Up @@ -1387,13 +1380,15 @@ public void shouldSwallowCliCommandLinesEvenWithWhiteSpace() {
assertThat(result, is("not a CLI command;"));
}

private static List<FieldInfo> buildTestSchema(final Schema... fieldTypes) {
final SchemaBuilder dataSourceBuilder = SchemaBuilder.struct().name("TestSchema");
private static List<FieldInfo> buildTestSchema(final SqlType... fieldTypes) {
final Builder schemaBuilder = LogicalSchema.builder();

for (int idx = 0; idx < fieldTypes.length; idx++) {
dataSourceBuilder.field("f_" + idx, fieldTypes[idx]);
schemaBuilder.valueField("f_" + idx, fieldTypes[idx]);
}

return EntityUtil.buildSourceSchemaEntity(LogicalSchema.of(dataSourceBuilder.build()), false);
final LogicalSchema schema = schemaBuilder.build();

return EntityUtil.buildSourceSchemaEntity(schema, false);
}
}
Loading

0 comments on commit 3e36dc0

Please sign in to comment.