Skip to content

Commit

Permalink
Merge pull request #15292: [BEAM-12628] Add Avro reflect-based Coder …
Browse files Browse the repository at this point in the history
…option
  • Loading branch information
iemejia committed Aug 6, 2021
2 parents 98747fd + e5c25a0 commit 3537f7e
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public static <T> AvroCoder<T> of(TypeDescriptor<T> type) {
* @param <T> the element type
*/
public static <T> AvroCoder<T> of(Class<T> clazz) {
return new AvroCoder<>(clazz, new ReflectData(clazz.getClassLoader()).getSchema(clazz));
return of(clazz, false);
}

/**
Expand All @@ -139,6 +139,16 @@ public static AvroGenericCoder of(Schema schema) {
return AvroGenericCoder.of(schema);
}

/**
* Returns an {@code AvroCoder} instance for the given class using Avro's Reflection API for
* encoding and decoding.
*
* @param <T> the element type
*/
public static <T> AvroCoder<T> of(Class<T> type, boolean useReflectApi) {
return of(type, new ReflectData(type.getClassLoader()).getSchema(type), useReflectApi);
}

/**
* Returns an {@code AvroCoder} instance for the provided element type using the provided Avro
* schema.
Expand All @@ -148,7 +158,17 @@ public static AvroGenericCoder of(Schema schema) {
* @param <T> the element type
*/
public static <T> AvroCoder<T> of(Class<T> type, Schema schema) {
return new AvroCoder<>(type, schema);
return of(type, schema, false);
}

/**
* Returns an {@code AvroCoder} instance for the given class and schema using Avro's Reflection
* API for encoding and decoding.
*
* @param <T> the element type
*/
public static <T> AvroCoder<T> of(Class<T> type, Schema schema, boolean useReflectApi) {
return new AvroCoder<>(type, schema, useReflectApi);
}

/**
Expand Down Expand Up @@ -270,6 +290,10 @@ public ReflectData get() {
private final Supplier<ReflectData> reflectData;

protected AvroCoder(Class<T> type, Schema schema) {
this(type, schema, false);
}

protected AvroCoder(Class<T> type, Schema schema, boolean useReflectApi) {
this.type = type;
this.schemaSupplier = new SerializableSchemaSupplier(schema);
typeDescriptor = TypeDescriptor.of(type);
Expand All @@ -291,7 +315,7 @@ protected AvroCoder(Class<T> type, Schema schema) {
public DatumReader<T> initialValue() {
if (myCoder.getType().equals(GenericRecord.class)) {
return new GenericDatumReader<>(myCoder.getSchema());
} else if (SpecificRecord.class.isAssignableFrom(myCoder.getType())) {
} else if (SpecificRecord.class.isAssignableFrom(myCoder.getType()) && !useReflectApi) {
return new SpecificDatumReader<>(myCoder.getType());
}
return new ReflectDatumReader<>(
Expand All @@ -307,7 +331,7 @@ public DatumReader<T> initialValue() {
public DatumWriter<T> initialValue() {
if (myCoder.getType().equals(GenericRecord.class)) {
return new GenericDatumWriter<>(myCoder.getSchema());
} else if (SpecificRecord.class.isAssignableFrom(myCoder.getType())) {
} else if (SpecificRecord.class.isAssignableFrom(myCoder.getType()) && !useReflectApi) {
return new SpecificDatumWriter<>(myCoder.getType());
}
return new ReflectDatumWriter<>(myCoder.getSchema(), myCoder.reflectData.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,19 @@ public void testSpecificRecordEncoding() throws Exception {
CoderProperties.coderDecodeEncodeEqual(coder, AVRO_SPECIFIC_RECORD);
}

@Test
public void testReflectRecordEncoding() throws Exception {
AvroCoder<TestAvro> coder = AvroCoder.of(TestAvro.class, true);
AvroCoder<TestAvro> coderWithSchema =
AvroCoder.of(TestAvro.class, AVRO_SPECIFIC_RECORD.getSchema(), true);

assertTrue(SpecificRecord.class.isAssignableFrom(coder.getType()));
assertTrue(SpecificRecord.class.isAssignableFrom(coderWithSchema.getType()));

CoderProperties.coderDecodeEncodeEqual(coder, AVRO_SPECIFIC_RECORD);
CoderProperties.coderDecodeEncodeEqual(coderWithSchema, AVRO_SPECIFIC_RECORD);
}

@Test
public void testGenericRecordEncoding() throws Exception {
String schemaString =
Expand Down Expand Up @@ -401,6 +414,14 @@ public void testAvroCoderIsSerializable() throws Exception {
SerializableUtils.ensureSerializable(coder);
}

@Test
public void testAvroReflectCoderIsSerializable() throws Exception {
AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class, true);

// Check that the coder is serializable using the regular JSON approach.
SerializableUtils.ensureSerializable(coder);
}

private void assertDeterministic(AvroCoder<?> coder) {
try {
coder.verifyDeterministic();
Expand Down

0 comments on commit 3537f7e

Please sign in to comment.