From 0ffadca87ecbebbd050f368ce669ceb479f5da05 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 26 Jul 2021 12:33:16 -0400 Subject: [PATCH 1/6] (BEAM-12628) Offer Avro reflect-based Coder option --- .../org/apache/beam/sdk/coders/AvroCoder.java | 50 ++++++++++++++++++- .../apache/beam/sdk/coders/AvroCoderTest.java | 25 ++++++++++ 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index e9c6793a5d63..7d59544ae8e6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -139,6 +139,41 @@ public static AvroGenericCoder of(Schema schema) { return AvroGenericCoder.of(schema); } + /** + * Returns an {@code AvroCoder} instance for the given class using Avro's Reflection API for + * encoding and decoding. + * + *

The ReflectData must correspond to the type provided. Note that custom ClassLoaders are not + * supported with ReflectData as they can't be properly serialized, so the default Avro + * ClassLoader will be used. + * + * @param the element type + */ + public static AvroCoder of(Class type, ReflectData reflectData) { + return of(type, reflectData.getSchema(type), reflectData); + } + + /** + * Returns an {@code AvroCoder} instance for the given class and schema using Avro's Reflection + * API for encoding and decoding. + * + *

The ReflectData must correspond to the type provided. Note that custom ClassLoaders are not + * supported with ReflectData as they can't be properly serialized, so the default Avro + * ClassLoader will be used. + * + * @param the element type + */ + public static AvroCoder of(Class type, Schema schema, ReflectData reflectData) { + if (reflectData.getClassLoader().getClass() != type.getClassLoader().getClass()) { + throw new AvroRuntimeException( + String.format( + "ReflectData with a different ClassLoader than Avro class %s is not supported in AvroCoder: %s was not equal to %s", + type, reflectData.getClassLoader().getClass(), type.getClassLoader().getClass())); + } + + return new AvroCoder<>(type, schema, reflectData); + } + /** * Returns an {@code AvroCoder} instance for the provided element type using the provided Avro * schema. @@ -270,6 +305,10 @@ public ReflectData get() { private final Supplier reflectData; protected AvroCoder(Class type, Schema schema) { + this(type, schema, null); + } + + protected AvroCoder(Class type, Schema schema, ReflectData reflectData) { this.type = type; this.schemaSupplier = new SerializableSchemaSupplier(schema); typeDescriptor = TypeDescriptor.of(type); @@ -280,6 +319,7 @@ protected AvroCoder(Class type, Schema schema) { this.decoder = new EmptyOnDeserializationThreadLocal<>(); this.encoder = new EmptyOnDeserializationThreadLocal<>(); + final boolean useAvroReflection = reflectData != null; this.reflectData = Suppliers.memoize(new SerializableReflectDataSupplier(getType())); // Reader and writer are allocated once per thread per Coder @@ -289,11 +329,15 @@ protected AvroCoder(Class type, Schema schema) { @Override public DatumReader initialValue() { - if (myCoder.getType().equals(GenericRecord.class)) { + if (useAvroReflection) { + return new ReflectDatumReader<>( + myCoder.getSchema(), myCoder.getSchema(), myCoder.reflectData.get()); + } else if (myCoder.getType().equals(GenericRecord.class)) { return new GenericDatumReader<>(myCoder.getSchema()); } else if (SpecificRecord.class.isAssignableFrom(myCoder.getType())) { return new SpecificDatumReader<>(myCoder.getType()); } + return new ReflectDatumReader<>( myCoder.getSchema(), myCoder.getSchema(), myCoder.reflectData.get()); } @@ -305,7 +349,9 @@ public DatumReader initialValue() { @Override public DatumWriter initialValue() { - if (myCoder.getType().equals(GenericRecord.class)) { + if (useAvroReflection) { + return new ReflectDatumWriter<>(myCoder.getSchema(), myCoder.reflectData.get()); + } else if (myCoder.getType().equals(GenericRecord.class)) { return new GenericDatumWriter<>(myCoder.getSchema()); } else if (SpecificRecord.class.isAssignableFrom(myCoder.getType())) { return new SpecificDatumWriter<>(myCoder.getType()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java index 522136c73705..14d5bd8e31bb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java @@ -329,6 +329,23 @@ public void testSpecificRecordEncoding() throws Exception { CoderProperties.coderDecodeEncodeEqual(coder, AVRO_SPECIFIC_RECORD); } + @Test + public void testReflectRecordEncoding() throws Exception { + // AvroCoder coder = AvroCoder.of(TestAvro.class, new + // ReflectData(TestAvro.class.getClassLoader())); + AvroCoder coderWithSchema = + AvroCoder.of( + TestAvro.class, + AVRO_SPECIFIC_RECORD.getSchema(), + new ReflectData(TestAvro.class.getClassLoader())); + + // assertTrue(SpecificRecord.class.isAssignableFrom(coder.getType())); + assertTrue(SpecificRecord.class.isAssignableFrom(coderWithSchema.getType())); + + // CoderProperties.coderDecodeEncodeEqual(coder, AVRO_SPECIFIC_RECORD); + CoderProperties.coderDecodeEncodeEqual(coderWithSchema, AVRO_SPECIFIC_RECORD); + } + @Test public void testGenericRecordEncoding() throws Exception { String schemaString = @@ -401,6 +418,14 @@ public void testAvroCoderIsSerializable() throws Exception { SerializableUtils.ensureSerializable(coder); } + @Test + public void testAvroReflectCoderIsSerializable() throws Exception { + AvroCoder coder = AvroCoder.of(Pojo.class, new ReflectData(Pojo.class.getClassLoader())); + + // Check that the coder is serializable using the regular JSON approach. + SerializableUtils.ensureSerializable(coder); + } + private void assertDeterministic(AvroCoder coder) { try { coder.verifyDeterministic(); From cd5204f6729eb32711560480c1d1d76d09c96933 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 26 Jul 2021 12:34:59 -0400 Subject: [PATCH 2/6] Cleanup --- .../src/main/java/org/apache/beam/sdk/coders/AvroCoder.java | 1 - .../test/java/org/apache/beam/sdk/coders/AvroCoderTest.java | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index 7d59544ae8e6..ba30bee772cd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -337,7 +337,6 @@ public DatumReader initialValue() { } else if (SpecificRecord.class.isAssignableFrom(myCoder.getType())) { return new SpecificDatumReader<>(myCoder.getType()); } - return new ReflectDatumReader<>( myCoder.getSchema(), myCoder.getSchema(), myCoder.reflectData.get()); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java index 14d5bd8e31bb..583fd6ff516f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java @@ -331,15 +331,15 @@ public void testSpecificRecordEncoding() throws Exception { @Test public void testReflectRecordEncoding() throws Exception { - // AvroCoder coder = AvroCoder.of(TestAvro.class, new - // ReflectData(TestAvro.class.getClassLoader())); + AvroCoder coder = AvroCoder.of(TestAvro.class, new + ReflectData(TestAvro.class.getClassLoader())); AvroCoder coderWithSchema = AvroCoder.of( TestAvro.class, AVRO_SPECIFIC_RECORD.getSchema(), new ReflectData(TestAvro.class.getClassLoader())); - // assertTrue(SpecificRecord.class.isAssignableFrom(coder.getType())); + assertTrue(SpecificRecord.class.isAssignableFrom(coder.getType())); assertTrue(SpecificRecord.class.isAssignableFrom(coderWithSchema.getType())); // CoderProperties.coderDecodeEncodeEqual(coder, AVRO_SPECIFIC_RECORD); From bf4be941ef42e5993b70a0e43e803401a1a9e07f Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 26 Jul 2021 12:35:31 -0400 Subject: [PATCH 3/6] Cleanup --- .../src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java index 583fd6ff516f..8437beafef56 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java @@ -342,7 +342,7 @@ public void testReflectRecordEncoding() throws Exception { assertTrue(SpecificRecord.class.isAssignableFrom(coder.getType())); assertTrue(SpecificRecord.class.isAssignableFrom(coderWithSchema.getType())); - // CoderProperties.coderDecodeEncodeEqual(coder, AVRO_SPECIFIC_RECORD); + CoderProperties.coderDecodeEncodeEqual(coder, AVRO_SPECIFIC_RECORD); CoderProperties.coderDecodeEncodeEqual(coderWithSchema, AVRO_SPECIFIC_RECORD); } From b3fe7641a68c16b1b02da579fe72376fb1934297 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Fri, 30 Jul 2021 16:19:50 -0400 Subject: [PATCH 4/6] simplify signature to just a boolean --- .../org/apache/beam/sdk/coders/AvroCoder.java | 41 +++++-------------- .../apache/beam/sdk/coders/AvroCoderTest.java | 10 ++--- 2 files changed, 13 insertions(+), 38 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index ba30bee772cd..17d358fce3cb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -143,35 +143,20 @@ public static AvroGenericCoder of(Schema schema) { * Returns an {@code AvroCoder} instance for the given class using Avro's Reflection API for * encoding and decoding. * - *

The ReflectData must correspond to the type provided. Note that custom ClassLoaders are not - * supported with ReflectData as they can't be properly serialized, so the default Avro - * ClassLoader will be used. - * * @param the element type */ - public static AvroCoder of(Class type, ReflectData reflectData) { - return of(type, reflectData.getSchema(type), reflectData); + public static AvroCoder of(Class type, boolean useReflectApi) { + return of(type, new ReflectData(type.getClassLoader()).getSchema(type), useReflectApi); } /** * Returns an {@code AvroCoder} instance for the given class and schema using Avro's Reflection * API for encoding and decoding. * - *

The ReflectData must correspond to the type provided. Note that custom ClassLoaders are not - * supported with ReflectData as they can't be properly serialized, so the default Avro - * ClassLoader will be used. - * * @param the element type */ - public static AvroCoder of(Class type, Schema schema, ReflectData reflectData) { - if (reflectData.getClassLoader().getClass() != type.getClassLoader().getClass()) { - throw new AvroRuntimeException( - String.format( - "ReflectData with a different ClassLoader than Avro class %s is not supported in AvroCoder: %s was not equal to %s", - type, reflectData.getClassLoader().getClass(), type.getClassLoader().getClass())); - } - - return new AvroCoder<>(type, schema, reflectData); + public static AvroCoder of(Class type, Schema schema, boolean useReflectApi) { + return new AvroCoder<>(type, schema, useReflectApi); } /** @@ -305,10 +290,10 @@ public ReflectData get() { private final Supplier reflectData; protected AvroCoder(Class type, Schema schema) { - this(type, schema, null); + this(type, schema, false); } - protected AvroCoder(Class type, Schema schema, ReflectData reflectData) { + protected AvroCoder(Class type, Schema schema, boolean useReflectApi) { this.type = type; this.schemaSupplier = new SerializableSchemaSupplier(schema); typeDescriptor = TypeDescriptor.of(type); @@ -319,7 +304,6 @@ protected AvroCoder(Class type, Schema schema, ReflectData reflectData) { this.decoder = new EmptyOnDeserializationThreadLocal<>(); this.encoder = new EmptyOnDeserializationThreadLocal<>(); - final boolean useAvroReflection = reflectData != null; this.reflectData = Suppliers.memoize(new SerializableReflectDataSupplier(getType())); // Reader and writer are allocated once per thread per Coder @@ -329,12 +313,9 @@ protected AvroCoder(Class type, Schema schema, ReflectData reflectData) { @Override public DatumReader initialValue() { - if (useAvroReflection) { - return new ReflectDatumReader<>( - myCoder.getSchema(), myCoder.getSchema(), myCoder.reflectData.get()); - } else if (myCoder.getType().equals(GenericRecord.class)) { + if (myCoder.getType().equals(GenericRecord.class)) { return new GenericDatumReader<>(myCoder.getSchema()); - } else if (SpecificRecord.class.isAssignableFrom(myCoder.getType())) { + } else if (SpecificRecord.class.isAssignableFrom(myCoder.getType()) && !useReflectApi) { return new SpecificDatumReader<>(myCoder.getType()); } return new ReflectDatumReader<>( @@ -348,11 +329,9 @@ public DatumReader initialValue() { @Override public DatumWriter initialValue() { - if (useAvroReflection) { - return new ReflectDatumWriter<>(myCoder.getSchema(), myCoder.reflectData.get()); - } else if (myCoder.getType().equals(GenericRecord.class)) { + if (myCoder.getType().equals(GenericRecord.class)) { return new GenericDatumWriter<>(myCoder.getSchema()); - } else if (SpecificRecord.class.isAssignableFrom(myCoder.getType())) { + } else if (SpecificRecord.class.isAssignableFrom(myCoder.getType()) && !useReflectApi) { return new SpecificDatumWriter<>(myCoder.getType()); } return new ReflectDatumWriter<>(myCoder.getSchema(), myCoder.reflectData.get()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java index 8437beafef56..d7886c30b661 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java @@ -331,13 +331,9 @@ public void testSpecificRecordEncoding() throws Exception { @Test public void testReflectRecordEncoding() throws Exception { - AvroCoder coder = AvroCoder.of(TestAvro.class, new - ReflectData(TestAvro.class.getClassLoader())); + AvroCoder coder = AvroCoder.of(TestAvro.class, true); AvroCoder coderWithSchema = - AvroCoder.of( - TestAvro.class, - AVRO_SPECIFIC_RECORD.getSchema(), - new ReflectData(TestAvro.class.getClassLoader())); + AvroCoder.of(TestAvro.class, AVRO_SPECIFIC_RECORD.getSchema(), true); assertTrue(SpecificRecord.class.isAssignableFrom(coder.getType())); assertTrue(SpecificRecord.class.isAssignableFrom(coderWithSchema.getType())); @@ -420,7 +416,7 @@ public void testAvroCoderIsSerializable() throws Exception { @Test public void testAvroReflectCoderIsSerializable() throws Exception { - AvroCoder coder = AvroCoder.of(Pojo.class, new ReflectData(Pojo.class.getClassLoader())); + AvroCoder coder = AvroCoder.of(Pojo.class, true); // Check that the coder is serializable using the regular JSON approach. SerializableUtils.ensureSerializable(coder); From 48dc93b2f4f66bb229345f31c01066abc4fb504e Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 5 Aug 2021 16:34:35 -0400 Subject: [PATCH 5/6] delegate to overloaded method --- .../src/main/java/org/apache/beam/sdk/coders/AvroCoder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index 17d358fce3cb..06003af9a018 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -128,7 +128,7 @@ public static AvroCoder of(TypeDescriptor type) { * @param the element type */ public static AvroCoder of(Class clazz) { - return new AvroCoder<>(clazz, new ReflectData(clazz.getClassLoader()).getSchema(clazz)); + return of(clazz, false); } /** From cfe9ac6a7161893006785228ce8c5b562238860a Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 5 Aug 2021 21:19:08 -0400 Subject: [PATCH 6/6] reorder --- .../org/apache/beam/sdk/coders/AvroCoder.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index 06003af9a018..b2367837cc45 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -150,25 +150,25 @@ public static AvroCoder of(Class type, boolean useReflectApi) { } /** - * Returns an {@code AvroCoder} instance for the given class and schema using Avro's Reflection - * API for encoding and decoding. + * Returns an {@code AvroCoder} instance for the provided element type using the provided Avro + * schema. + * + *

The schema must correspond to the type provided. * * @param the element type */ - public static AvroCoder of(Class type, Schema schema, boolean useReflectApi) { - return new AvroCoder<>(type, schema, useReflectApi); + public static AvroCoder of(Class type, Schema schema) { + return of(type, schema, false); } /** - * Returns an {@code AvroCoder} instance for the provided element type using the provided Avro - * schema. - * - *

The schema must correspond to the type provided. + * Returns an {@code AvroCoder} instance for the given class and schema using Avro's Reflection + * API for encoding and decoding. * * @param the element type */ - public static AvroCoder of(Class type, Schema schema) { - return new AvroCoder<>(type, schema); + public static AvroCoder of(Class type, Schema schema, boolean useReflectApi) { + return new AvroCoder<>(type, schema, useReflectApi); } /**