Skip to content

Commit

Permalink
[fix][elasticsearch-sink] Handle Avro collections native types (Gener…
Browse files Browse the repository at this point in the history
…icData.Array and Utf8 map keys) (#75)
  • Loading branch information
nicoloboschi authored Apr 26, 2022
1 parent 1958af2 commit 8292dcf
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;

import java.util.*;
import java.util.stream.Collectors;

/**
* Convert an AVRO GenericRecord to a JsonNode.
Expand Down Expand Up @@ -86,18 +88,26 @@ public static JsonNode toJson(ElasticSearchConfig elasticSearchConfig, Schema sc
case ARRAY: {
Schema elementSchema = schema.getElementType();
ArrayNode arrayNode = jsonNodeFactory.arrayNode();
for (Object elem : (Object[]) value) {
Object[] iterable;
if (value instanceof GenericData.Array) {
iterable = ((GenericData.Array) value).toArray();
} else {
iterable = (Object[]) value;
}
for (Object elem : iterable) {
JsonNode fieldValue = toJson(elasticSearchConfig, elementSchema, elem);
arrayNode.add(fieldValue);
}
return arrayNode;
}
case MAP: {
Map<String, Object> map = (Map<String, Object>) value;
Map<Object, Object> map = (Map<Object, Object>) value;
ObjectNode objectNode = jsonNodeFactory.objectNode();
for (Map.Entry<String, Object> entry : map.entrySet()) {
for (Map.Entry<Object, Object> entry : map.entrySet()) {
JsonNode jsonNode = toJson(elasticSearchConfig, schema.getValueType(), entry.getValue());
objectNode.set(entry.getKey(), jsonNode);
// can be a String or org.apache.avro.util.Utf8
final String entryKey = entry.getKey() == null ? null : entry.getKey().toString();
objectNode.set(entryKey, jsonNode);
}
return objectNode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class JsonConverterTests {

@Test
public void testAvroToJson() throws IOException {
Schema avroArraySchema = SchemaBuilder.array().items(SchemaBuilder.builder().stringType());
Schema mapUtf8Schema = SchemaBuilder.map().values(SchemaBuilder.builder().intType());
Schema schema = SchemaBuilder.record("record").fields()
.name("n").type().longType().longDefault(10)
.name("l").type().longType().longDefault(10)
Expand All @@ -55,7 +57,9 @@ public void testAvroToJson() throws IOException {
.name("fi").type().fixed("fi").size(3).fixedDefault(new byte[]{1,2,3})
.name("en").type().enumeration("en").symbols("a","b","c").enumDefault("b")
.name("array").type().optional().array().items(SchemaBuilder.builder().stringType())
.name("arrayavro").type().optional().array().items(SchemaBuilder.builder().stringType())
.name("map").type().optional().map().values(SchemaBuilder.builder().intType())
.name("maputf8").type().optional().map().values(SchemaBuilder.builder().intType())
.endRecord();
GenericRecord genericRecord = new GenericData.Record(schema);
genericRecord.put("n", null);
Expand All @@ -69,7 +73,9 @@ public void testAvroToJson() throws IOException {
genericRecord.put("fi", GenericData.get().createFixed(null, new byte[]{'a','b','c'}, schema.getField("fi").schema()));
genericRecord.put("en", GenericData.get().createEnum("b", schema.getField("en").schema()));
genericRecord.put("array", new String[] {"toto"});
genericRecord.put("arrayavro", new GenericData.Array<>(avroArraySchema, Arrays.asList("toto")));
genericRecord.put("map", ImmutableMap.of("a",10));
genericRecord.put("maputf8", ImmutableMap.of(new org.apache.avro.util.Utf8("a"),10));
JsonNode jsonNode = JsonConverter.toJson(ElasticSearchConfig.load(ImmutableMap.of()), genericRecord);
assertEquals(jsonNode.get("n"), NullNode.getInstance());
assertEquals(jsonNode.get("l").asLong(), 1L);
Expand All @@ -83,9 +89,14 @@ public void testAvroToJson() throws IOException {
assertEquals(jsonNode.get("s").asText(), "toto");
assertTrue(jsonNode.get("array").isArray());
assertEquals(jsonNode.get("array").iterator().next().asText(), "toto");
assertTrue(jsonNode.get("arrayavro").isArray());
assertEquals(jsonNode.get("arrayavro").iterator().next().asText(), "toto");
assertTrue(jsonNode.get("map").isObject());
assertEquals(jsonNode.get("map").elements().next().asText(), "10");
assertEquals(jsonNode.get("map").get("a").numberValue(), 10);
assertTrue(jsonNode.get("maputf8").isObject());
assertEquals(jsonNode.get("maputf8").elements().next().asText(), "10");
assertEquals(jsonNode.get("maputf8").get("a").numberValue(), 10);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.google.common.collect.ImmutableMap;
import io.vertx.core.http.RequestOptions;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
Expand Down Expand Up @@ -61,6 +66,9 @@ public abstract class ElasticSearchSinkTester extends SinkTester<ElasticsearchCo
public static final class SimplePojo {
private String field1;
private String field2;
private List<Integer> list1;
private Set<Long> set1;
private Map<String, String> map1;
}

/**
Expand Down Expand Up @@ -129,9 +137,20 @@ public void produceMessage(int numMessages, PulsarClient client,
for (int i = 0; i < numMessages; i++) {
String key = "key-" + i;
kvs.put(key, key);
final SimplePojo keyPojo = new SimplePojo(
"f1_" + i,
"f2_" + i,
Arrays.asList(i, i +1),
new HashSet<>(Arrays.asList((long) i)),
ImmutableMap.of("map1_k_" + i, "map1_kv_" + i));
final SimplePojo valuePojo = new SimplePojo(
"f1_" + i,
"f2_" + i,
Arrays.asList(i, i +1),
new HashSet<>(Arrays.asList((long) i)),
ImmutableMap.of("map1_v_" + i, "map1_vv_" + i));
producer.newMessage()
.value(new KeyValue<>(new SimplePojo("f1_" + i, "f2_" + i),
new SimplePojo("v1_" + i, "v2_" + i)))
.value(new KeyValue<>(keyPojo, valuePojo))
.send();
}

Expand Down

0 comments on commit 8292dcf

Please sign in to comment.