Skip to content

Commit

Permalink
fix: /inserts-stream endpoint now supports nested types (#5621)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Jun 15, 2020
1 parent 35dd8e1 commit 866ae34
Show file tree
Hide file tree
Showing 10 changed files with 762 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@
import java.math.BigDecimal;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import kafka.zookeeper.ZooKeeperClientException;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.After;
import org.junit.AfterClass;
Expand All @@ -91,9 +93,9 @@ public class ClientIntegrationTest {
private static final String TEST_STREAM = TEST_DATA_PROVIDER.kstreamName();
private static final int TEST_NUM_ROWS = TEST_DATA_PROVIDER.data().size();
private static final List<String> TEST_COLUMN_NAMES =
ImmutableList.of("STR", "LONG", "DEC", "ARRAY", "MAP");
ImmutableList.of("STR", "LONG", "DEC", "ARRAY", "MAP", "STRUCT", "COMPLEX");
private static final List<ColumnType> TEST_COLUMN_TYPES =
RowUtil.columnTypesFromStrings(ImmutableList.of("STRING", "BIGINT", "DECIMAL", "ARRAY", "MAP"));
RowUtil.columnTypesFromStrings(ImmutableList.of("STRING", "BIGINT", "DECIMAL", "ARRAY", "MAP", "STRUCT", "STRUCT"));
private static final List<KsqlArray> TEST_EXPECTED_ROWS = convertToClientRows(
TEST_DATA_PROVIDER.data());

Expand Down Expand Up @@ -123,6 +125,18 @@ public class ClientIntegrationTest {
RowUtil.columnTypesFromStrings(ImmutableList.of("STRING", "BIGINT"));
private static final KsqlArray PULL_QUERY_EXPECTED_ROW = new KsqlArray(ImmutableList.of("FOO", 1));

private static final KsqlObject COMPLEX_FIELD_VALUE = new KsqlObject()
.put("DECIMAL", new BigDecimal("1.1"))
.put("STRUCT", new KsqlObject().put("F1", "foo").put("F2", 3))
.put("ARRAY_ARRAY", new KsqlArray().add(new KsqlArray().add("bar")))
.put("ARRAY_STRUCT", new KsqlArray().add(new KsqlObject().put("F1", "x")))
.put("ARRAY_MAP", new KsqlArray().add(new KsqlObject().put("k", 10)))
.put("MAP_ARRAY", new KsqlObject().put("k", new KsqlArray().add("e1").add("e2")))
.put("MAP_MAP", new KsqlObject().put("k1", new KsqlObject().put("k2", 5)))
.put("MAP_STRUCT", new KsqlObject().put("k", new KsqlObject().put("F1", "baz")));
private static final KsqlObject EXPECTED_COMPLEX_FIELD_VALUE = COMPLEX_FIELD_VALUE.copy()
.put("DECIMAL", 1.1d); // Expect raw decimal value, whereas put(BigDecimal) serializes as string to avoid loss of precision

private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build();

private static final TestKsqlRestApp REST_APP = TestKsqlRestApp
Expand Down Expand Up @@ -435,10 +449,12 @@ public void shouldInsertInto() throws Exception {
// Given
final KsqlObject insertRow = new KsqlObject()
.put("str", "HELLO") // Column names are case-insensitive
.put("`LONG`", 100L) // Quotes may be used to preserve case-sensitivity
.put("DEC", new BigDecimal("13.31"))
.put("`LONG`", 100L) // Backticks may be used to preserve case-sensitivity
.put("\"DEC\"", new BigDecimal("13.31")) // Double quotes may also be used to preserve case-sensitivity
.put("ARRAY", new KsqlArray().add("v1").add("v2"))
.put("MAP", new KsqlObject().put("some_key", "a_value").put("another_key", ""));
.put("MAP", new KsqlObject().put("some_key", "a_value").put("another_key", ""))
.put("STRUCT", new KsqlObject().put("f1", 12)) // Nested field names are case-insensitive
.put("COMPLEX", COMPLEX_FIELD_VALUE);

// When
client.insertInto(EMPTY_TEST_STREAM.toLowerCase(), insertRow).get(); // Stream name is case-insensitive
Expand All @@ -454,6 +470,8 @@ public void shouldInsertInto() throws Exception {
assertThat(rows.get(0).getDecimal("DEC"), is(new BigDecimal("13.31")));
assertThat(rows.get(0).getKsqlArray("ARRAY"), is(new KsqlArray().add("v1").add("v2")));
assertThat(rows.get(0).getKsqlObject("MAP"), is(new KsqlObject().put("some_key", "a_value").put("another_key", "")));
assertThat(rows.get(0).getKsqlObject("STRUCT"), is(new KsqlObject().put("F1", 12)));
assertThat(rows.get(0).getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE));
}

@Test
Expand Down Expand Up @@ -487,7 +505,9 @@ public void shouldStreamQueryWithProperties() throws Exception {
.put("LONG", 2000L)
.put("DEC", new BigDecimal("12.34"))
.put("ARRAY", new KsqlArray().add("v1_shouldStreamQueryWithProperties").add("v2_shouldStreamQueryWithProperties"))
.put("MAP", new KsqlObject().put("test_name", "shouldStreamQueryWithProperties"));
.put("MAP", new KsqlObject().put("test_name", "shouldStreamQueryWithProperties"))
.put("STRUCT", new KsqlObject().put("F1", 4))
.put("COMPLEX", COMPLEX_FIELD_VALUE);

// When
final StreamedQueryResult queryResult = client.streamQuery(sql, properties).get();
Expand All @@ -508,6 +528,8 @@ public void shouldStreamQueryWithProperties() throws Exception {
assertThat(row.getDecimal("DEC"), is(new BigDecimal("12.34")));
assertThat(row.getKsqlArray("ARRAY"), is(new KsqlArray().add("v1_shouldStreamQueryWithProperties").add("v2_shouldStreamQueryWithProperties")));
assertThat(row.getKsqlObject("MAP"), is(new KsqlObject().put("test_name", "shouldStreamQueryWithProperties")));
assertThat(row.getKsqlObject("STRUCT"), is(new KsqlObject().put("F1", 4)));
assertThat(row.getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE));
}

@Test
Expand All @@ -522,7 +544,9 @@ public void shouldExecuteQueryWithProperties() {
.put("LONG", 2000L)
.put("DEC", new BigDecimal("12.34"))
.put("ARRAY", new KsqlArray().add("v1_shouldExecuteQueryWithProperties").add("v2_shouldExecuteQueryWithProperties"))
.put("MAP", new KsqlObject().put("test_name", "shouldExecuteQueryWithProperties"));
.put("MAP", new KsqlObject().put("test_name", "shouldExecuteQueryWithProperties"))
.put("STRUCT", new KsqlObject().put("F1", 4))
.put("COMPLEX", COMPLEX_FIELD_VALUE);

// When
final BatchedQueryResult queryResult = client.executeQuery(sql, properties);
Expand Down Expand Up @@ -558,6 +582,8 @@ public void shouldExecuteQueryWithProperties() {
assertThat(row.getDecimal("DEC"), is(new BigDecimal("12.34")));
assertThat(row.getKsqlArray("ARRAY"), is(new KsqlArray().add("v1_shouldExecuteQueryWithProperties").add("v2_shouldExecuteQueryWithProperties")));
assertThat(row.getKsqlObject("MAP"), is(new KsqlObject().put("test_name", "shouldExecuteQueryWithProperties")));
assertThat(row.getKsqlObject("STRUCT"), is(new KsqlObject().put("F1", 4)));
assertThat(row.getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE));
}

private Client createClient() {
Expand Down Expand Up @@ -617,13 +643,17 @@ private static void verifyStreamRowWithIndex(final Row row, final int index) {
assertThat(row.getDecimal("DEC"), is(expectedRow.getDecimal(2)));
assertThat(row.getKsqlArray("ARRAY"), is(expectedRow.getKsqlArray(3)));
assertThat(row.getKsqlObject("MAP"), is(expectedRow.getKsqlObject(4)));
assertThat(row.getKsqlObject("STRUCT"), is(expectedRow.getKsqlObject(5)));
assertThat(row.getKsqlObject("COMPLEX"), is(expectedRow.getKsqlObject(6)));

// verify index-based getters are 1-indexed
assertThat(row.getString(1), is(row.getString("STR")));
assertThat(row.getLong(2), is(row.getLong("LONG")));
assertThat(row.getDecimal(3), is(row.getDecimal("DEC")));
assertThat(row.getKsqlArray(4), is(row.getKsqlArray("ARRAY")));
assertThat(row.getKsqlObject(5), is(row.getKsqlObject("MAP")));
assertThat(row.getKsqlObject(6), is(row.getKsqlObject("STRUCT")));
assertThat(row.getKsqlObject(7), is(row.getKsqlObject("COMPLEX")));

// verify isNull() evaluation
assertThat(row.isNull("STR"), is(false));
Expand All @@ -640,6 +670,8 @@ private static void verifyStreamRowWithIndex(final Row row, final int index) {
assertThat(values.getDecimal(2), is(row.getDecimal("DEC")));
assertThat(values.getKsqlArray(3), is(row.getKsqlArray("ARRAY")));
assertThat(values.getKsqlObject(4), is(row.getKsqlObject("MAP")));
assertThat(values.getKsqlObject(5), is(row.getKsqlObject("STRUCT")));
assertThat(values.getKsqlObject(6), is(row.getKsqlObject("COMPLEX")));
assertThat(values.toJsonString(), is((new JsonArray(values.getList())).toString()));
assertThat(values.toString(), is(values.toJsonString()));

Expand All @@ -653,6 +685,8 @@ private static void verifyStreamRowWithIndex(final Row row, final int index) {
assertThat(obj.getDecimal("DEC"), is(row.getDecimal("DEC")));
assertThat(obj.getKsqlArray("ARRAY"), is(row.getKsqlArray("ARRAY")));
assertThat(obj.getKsqlObject("MAP"), is(row.getKsqlObject("MAP")));
assertThat(obj.getKsqlObject("STRUCT"), is(row.getKsqlObject("STRUCT")));
assertThat(obj.getKsqlObject("COMPLEX"), is(row.getKsqlObject("COMPLEX")));
assertThat(obj.containsKey("DEC"), is(true));
assertThat(obj.containsKey("notafield"), is(false));
assertThat(obj.toJsonString(), is((new JsonObject(obj.getMap())).toString()));
Expand Down Expand Up @@ -720,8 +754,18 @@ private static List<KsqlArray> convertToClientRows(final Multimap<String, Generi
final List<KsqlArray> expectedRows = new ArrayList<>();
for (final Map.Entry<String, GenericRow> entry : data.entries()) {
final KsqlArray expectedRow = new KsqlArray()
.add(entry.getKey())
.addAll(new KsqlArray(entry.getValue().values()));
.add(entry.getKey());
for (final Object value : entry.getValue().values()) {
if (value instanceof Struct) {
expectedRow.add(StructuredTypesDataProvider.structToMap((Struct) value));
} else if (value instanceof BigDecimal) {
// Can't use expectedRow.add((BigDecimal) value) directly since client serializes BigDecimal as string,
// whereas this method builds up the expected result (unrelated to serialization)
expectedRow.addAll(new KsqlArray(Collections.singletonList(value)));
} else {
expectedRow.add(value);
}
}
expectedRows.add(expectedRow);
}
return expectedRows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.SerdeOption;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;

public class StructuredTypesDataProvider extends TestDataProvider<String> {

Expand All @@ -37,23 +44,114 @@ public class StructuredTypesDataProvider extends TestDataProvider<String> {
.valueColumn(ColumnName.of("DEC"), SqlTypes.decimal(4, 2))
.valueColumn(ColumnName.of("ARRAY"), SqlTypes.array(SqlTypes.STRING))
.valueColumn(ColumnName.of("MAP"), SqlTypes.map(SqlTypes.STRING))
.valueColumn(ColumnName.of("STRUCT"), SqlTypes.struct().field("F1", SqlTypes.INTEGER).build())
.valueColumn(ColumnName.of("COMPLEX"), SqlTypes.struct()
.field("DECIMAL", SqlTypes.decimal(2, 1))
.field("STRUCT", SqlTypes.struct()
.field("F1", SqlTypes.STRING)
.field("F2", SqlTypes.INTEGER)
.build())
.field("ARRAY_ARRAY", SqlTypes.array(SqlTypes.array(SqlTypes.STRING)))
.field("ARRAY_STRUCT", SqlTypes.array(SqlTypes.struct().field("F1", SqlTypes.STRING).build()))
.field("ARRAY_MAP", SqlTypes.array(SqlTypes.map(SqlTypes.INTEGER)))
.field("MAP_ARRAY", SqlTypes.map(SqlTypes.array(SqlTypes.STRING)))
.field("MAP_MAP", SqlTypes.map(SqlTypes.map(SqlTypes.INTEGER)))
.field("MAP_STRUCT", SqlTypes.map(SqlTypes.struct().field("F1", SqlTypes.STRING).build()))
.build()
)
.build();

private static final PhysicalSchema PHYSICAL_SCHEMA = PhysicalSchema
.from(LOGICAL_SCHEMA, SerdeOption.none());

private static final Schema STRUCT_FIELD_SCHEMA = LOGICAL_SCHEMA.valueConnectSchema().field("STRUCT").schema();
private static final Schema COMPLEX_FIELD_SCHEMA = LOGICAL_SCHEMA.valueConnectSchema().field("COMPLEX").schema();

private static final Multimap<String, GenericRow> ROWS = ImmutableListMultimap
.<String, GenericRow>builder()
.put("FOO", genericRow(1L, new BigDecimal("1.11"), Collections.singletonList("a"), Collections.singletonMap("k1", "v1")))
.put("BAR", genericRow(2L, new BigDecimal("2.22"), Collections.emptyList(), Collections.emptyMap()))
.put("BAZ", genericRow(3L, new BigDecimal("30.33"), Collections.singletonList("b"), Collections.emptyMap()))
.put("BUZZ", genericRow(4L, new BigDecimal("40.44"), ImmutableList.of("c", "d"), Collections.emptyMap()))
.put("FOO", genericRow(1L, new BigDecimal("1.11"), Collections.singletonList("a"), Collections.singletonMap("k1", "v1"), generateStruct(2), generateComplexStruct(0)))
.put("BAR", genericRow(2L, new BigDecimal("2.22"), Collections.emptyList(), Collections.emptyMap(), generateStruct(3), generateComplexStruct(1)))
.put("BAZ", genericRow(3L, new BigDecimal("30.33"), Collections.singletonList("b"), Collections.emptyMap(), generateStruct(null), generateComplexStruct(2)))
.put("BUZZ", genericRow(4L, new BigDecimal("40.44"), ImmutableList.of("c", "d"), Collections.emptyMap(), generateStruct(88), generateComplexStruct(3)))
// Additional entries for repeated keys
.put("BAZ", genericRow(5L, new BigDecimal("12"), ImmutableList.of("e"), ImmutableMap.of("k1", "v1", "k2", "v2")))
.put("BUZZ", genericRow(6L, new BigDecimal("10.1"), ImmutableList.of("f", "g"), Collections.emptyMap()))
.put("BAZ", genericRow(5L, new BigDecimal("12"), ImmutableList.of("e"), ImmutableMap.of("k1", "v1", "k2", "v2"), generateStruct(0), generateComplexStruct(4)))
.put("BUZZ", genericRow(6L, new BigDecimal("10.1"), ImmutableList.of("f", "g"), Collections.emptyMap(), generateStruct(null), generateComplexStruct(5)))
.build();

public StructuredTypesDataProvider() {
super("STRUCTURED_TYPES", PHYSICAL_SCHEMA, ROWS);
}

@SuppressWarnings("unchecked")
public static Map<String, Object> structToMap(final Struct struct) {
return (Map<String, Object>) structToMapHelper(struct);
}

private static Object structToMapHelper(final Object value) {
if (value instanceof Struct) {
final Struct struct = (Struct) value;

final Map<String, Object> result = new HashMap<>();
for (final Field field : struct.schema().fields()) {
result.put(field.name(), structToMapHelper(struct.get(field)));
}

return result;
} else if (value instanceof List) {
final List<?> list = (List<?>) value;

final List<Object> result = new ArrayList<>();
for (final Object o : list) {
result.add(structToMapHelper(o));
}

return result;
} else if (value instanceof Map) {
final Map<?, ?> map = (Map<?, ?>) value;

final Map<String, Object> result = new HashMap<>();
for (final Map.Entry<?, ?> entry : map.entrySet()) {
result.put(entry.getKey().toString(), structToMapHelper(entry.getValue()));
}

return result;
} else {
return value;
}
}

private static Struct generateStruct(final Integer value) {
final Struct struct = new Struct(STRUCT_FIELD_SCHEMA);
struct.put("F1", value);
return struct;
}

private static Struct generateComplexStruct(final int i) {
final Struct complexStruct = new Struct(COMPLEX_FIELD_SCHEMA);

complexStruct.put("DECIMAL", new BigDecimal(i));

final Struct struct = new Struct(COMPLEX_FIELD_SCHEMA.field("STRUCT").schema());
struct.put("F1", "v" + i);
struct.put("F2", i);
complexStruct.put("STRUCT", struct);

complexStruct.put("ARRAY_ARRAY", ImmutableList.of(ImmutableList.of("foo")));

final Struct arrayStruct = new Struct(COMPLEX_FIELD_SCHEMA.field("ARRAY_STRUCT").schema().valueSchema());
arrayStruct.put("F1", "v" + i);
complexStruct.put("ARRAY_STRUCT", ImmutableList.of(arrayStruct));

complexStruct.put("ARRAY_MAP", ImmutableList.of(ImmutableMap.of("k1", i)));

complexStruct.put("MAP_ARRAY", ImmutableMap.of("k", ImmutableList.of("v" + i)));

complexStruct.put("MAP_MAP", ImmutableMap.of("k", ImmutableMap.of("k", i)));

final Struct mapStruct = new Struct(COMPLEX_FIELD_SCHEMA.field("MAP_STRUCT").schema().valueSchema());
mapStruct.put("F1", "v" + i);
complexStruct.put("MAP_STRUCT", ImmutableMap.of("k", mapStruct));

return complexStruct;
}
}
Loading

0 comments on commit 866ae34

Please sign in to comment.