From e5c25a052b598f0888b919015c527f90c0c028d0 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 26 Jul 2021 12:33:16 -0400 Subject: [PATCH] [BEAM-12628] Add Avro reflect-based Coder option --- .../org/apache/beam/sdk/coders/AvroCoder.java | 32 ++++++++++++++++--- .../apache/beam/sdk/coders/AvroCoderTest.java | 21 ++++++++++++ 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index e9c6793a5d63..b2367837cc45 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -128,7 +128,7 @@ public static AvroCoder of(TypeDescriptor type) { * @param the element type */ public static AvroCoder of(Class clazz) { - return new AvroCoder<>(clazz, new ReflectData(clazz.getClassLoader()).getSchema(clazz)); + return of(clazz, false); } /** @@ -139,6 +139,16 @@ public static AvroGenericCoder of(Schema schema) { return AvroGenericCoder.of(schema); } + /** + * Returns an {@code AvroCoder} instance for the given class using Avro's Reflection API for + * encoding and decoding. + * + * @param the element type + */ + public static AvroCoder of(Class type, boolean useReflectApi) { + return of(type, new ReflectData(type.getClassLoader()).getSchema(type), useReflectApi); + } + /** * Returns an {@code AvroCoder} instance for the provided element type using the provided Avro * schema. @@ -148,7 +158,17 @@ public static AvroGenericCoder of(Schema schema) { * @param the element type */ public static AvroCoder of(Class type, Schema schema) { - return new AvroCoder<>(type, schema); + return of(type, schema, false); + } + + /** + * Returns an {@code AvroCoder} instance for the given class and schema using Avro's Reflection + * API for encoding and decoding. + * + * @param the element type + */ + public static AvroCoder of(Class type, Schema schema, boolean useReflectApi) { + return new AvroCoder<>(type, schema, useReflectApi); } /** @@ -270,6 +290,10 @@ public ReflectData get() { private final Supplier reflectData; protected AvroCoder(Class type, Schema schema) { + this(type, schema, false); + } + + protected AvroCoder(Class type, Schema schema, boolean useReflectApi) { this.type = type; this.schemaSupplier = new SerializableSchemaSupplier(schema); typeDescriptor = TypeDescriptor.of(type); @@ -291,7 +315,7 @@ protected AvroCoder(Class type, Schema schema) { public DatumReader initialValue() { if (myCoder.getType().equals(GenericRecord.class)) { return new GenericDatumReader<>(myCoder.getSchema()); - } else if (SpecificRecord.class.isAssignableFrom(myCoder.getType())) { + } else if (SpecificRecord.class.isAssignableFrom(myCoder.getType()) && !useReflectApi) { return new SpecificDatumReader<>(myCoder.getType()); } return new ReflectDatumReader<>( @@ -307,7 +331,7 @@ public DatumReader initialValue() { public DatumWriter initialValue() { if (myCoder.getType().equals(GenericRecord.class)) { return new GenericDatumWriter<>(myCoder.getSchema()); - } else if (SpecificRecord.class.isAssignableFrom(myCoder.getType())) { + } else if (SpecificRecord.class.isAssignableFrom(myCoder.getType()) && !useReflectApi) { return new SpecificDatumWriter<>(myCoder.getType()); } return new ReflectDatumWriter<>(myCoder.getSchema(), myCoder.reflectData.get()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java index 522136c73705..d7886c30b661 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java @@ -329,6 +329,19 @@ public void testSpecificRecordEncoding() throws Exception { CoderProperties.coderDecodeEncodeEqual(coder, AVRO_SPECIFIC_RECORD); } + @Test + public void testReflectRecordEncoding() throws Exception { + AvroCoder coder = AvroCoder.of(TestAvro.class, true); + AvroCoder coderWithSchema = + AvroCoder.of(TestAvro.class, AVRO_SPECIFIC_RECORD.getSchema(), true); + + assertTrue(SpecificRecord.class.isAssignableFrom(coder.getType())); + assertTrue(SpecificRecord.class.isAssignableFrom(coderWithSchema.getType())); + + CoderProperties.coderDecodeEncodeEqual(coder, AVRO_SPECIFIC_RECORD); + CoderProperties.coderDecodeEncodeEqual(coderWithSchema, AVRO_SPECIFIC_RECORD); + } + @Test public void testGenericRecordEncoding() throws Exception { String schemaString = @@ -401,6 +414,14 @@ public void testAvroCoderIsSerializable() throws Exception { SerializableUtils.ensureSerializable(coder); } + @Test + public void testAvroReflectCoderIsSerializable() throws Exception { + AvroCoder coder = AvroCoder.of(Pojo.class, true); + + // Check that the coder is serializable using the regular JSON approach. + SerializableUtils.ensureSerializable(coder); + } + private void assertDeterministic(AvroCoder coder) { try { coder.verifyDeterministic();