Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-12628][cherry-pick] add ReflectDatum{Reader,Writer} option for AvroCoder #15295

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public static <T> AvroCoder<T> of(TypeDescriptor<T> type) {
* @param <T> the element type
*/
public static <T> AvroCoder<T> of(Class<T> clazz) {
return new AvroCoder<>(clazz, new ReflectData(clazz.getClassLoader()).getSchema(clazz));
return of(clazz, false);
}

/**
Expand All @@ -139,6 +139,16 @@ public static AvroGenericCoder of(Schema schema) {
return AvroGenericCoder.of(schema);
}

/**
* Returns an {@code AvroCoder} instance for the given class using Avro's Reflection API for
* encoding and decoding.
*
* @param <T> the element type
*/
public static <T> AvroCoder<T> of(Class<T> type, boolean useReflectApi) {
return of(type, new ReflectData(type.getClassLoader()).getSchema(type), useReflectApi);
}

/**
* Returns an {@code AvroCoder} instance for the provided element type using the provided Avro
* schema.
Expand All @@ -148,7 +158,17 @@ public static AvroGenericCoder of(Schema schema) {
* @param <T> the element type
*/
public static <T> AvroCoder<T> of(Class<T> type, Schema schema) {
return new AvroCoder<>(type, schema);
return of(type, schema, false);
}

/**
* Returns an {@code AvroCoder} instance for the given class and schema using Avro's Reflection
* API for encoding and decoding.
*
* @param <T> the element type
*/
public static <T> AvroCoder<T> of(Class<T> type, Schema schema, boolean useReflectApi) {
return new AvroCoder<>(type, schema, useReflectApi);
}

/**
Expand Down Expand Up @@ -270,6 +290,10 @@ public ReflectData get() {
private final Supplier<ReflectData> reflectData;

protected AvroCoder(Class<T> type, Schema schema) {
this(type, schema, false);
}

protected AvroCoder(Class<T> type, Schema schema, boolean useReflectApi) {
this.type = type;
this.schemaSupplier = new SerializableSchemaSupplier(schema);
typeDescriptor = TypeDescriptor.of(type);
Expand All @@ -291,7 +315,7 @@ protected AvroCoder(Class<T> type, Schema schema) {
public DatumReader<T> initialValue() {
if (myCoder.getType().equals(GenericRecord.class)) {
return new GenericDatumReader<>(myCoder.getSchema());
} else if (SpecificRecord.class.isAssignableFrom(myCoder.getType())) {
} else if (SpecificRecord.class.isAssignableFrom(myCoder.getType()) && !useReflectApi) {
return new SpecificDatumReader<>(myCoder.getType());
}
return new ReflectDatumReader<>(
Expand All @@ -307,7 +331,7 @@ public DatumReader<T> initialValue() {
public DatumWriter<T> initialValue() {
if (myCoder.getType().equals(GenericRecord.class)) {
return new GenericDatumWriter<>(myCoder.getSchema());
} else if (SpecificRecord.class.isAssignableFrom(myCoder.getType())) {
} else if (SpecificRecord.class.isAssignableFrom(myCoder.getType()) && !useReflectApi) {
return new SpecificDatumWriter<>(myCoder.getType());
}
return new ReflectDatumWriter<>(myCoder.getSchema(), myCoder.reflectData.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,19 @@ public void testSpecificRecordEncoding() throws Exception {
CoderProperties.coderDecodeEncodeEqual(coder, AVRO_SPECIFIC_RECORD);
}

@Test
public void testReflectRecordEncoding() throws Exception {
AvroCoder<TestAvro> coder = AvroCoder.of(TestAvro.class, true);
AvroCoder<TestAvro> coderWithSchema =
AvroCoder.of(TestAvro.class, AVRO_SPECIFIC_RECORD.getSchema(), true);

assertTrue(SpecificRecord.class.isAssignableFrom(coder.getType()));
assertTrue(SpecificRecord.class.isAssignableFrom(coderWithSchema.getType()));

CoderProperties.coderDecodeEncodeEqual(coder, AVRO_SPECIFIC_RECORD);
CoderProperties.coderDecodeEncodeEqual(coderWithSchema, AVRO_SPECIFIC_RECORD);
}

@Test
public void testGenericRecordEncoding() throws Exception {
String schemaString =
Expand Down Expand Up @@ -401,6 +414,14 @@ public void testAvroCoderIsSerializable() throws Exception {
SerializableUtils.ensureSerializable(coder);
}

@Test
public void testAvroReflectCoderIsSerializable() throws Exception {
AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class, true);

// Check that the coder is serializable using the regular JSON approach.
SerializableUtils.ensureSerializable(coder);
}

private void assertDeterministic(AvroCoder<?> coder) {
try {
coder.verifyDeterministic();
Expand Down