From 9487a8bfbcd06feca62dd7c2e53efd4de3c1dd96 Mon Sep 17 00:00:00 2001 From: Krisso <105232495+krisso-rtb@users.noreply.github.com> Date: Wed, 24 Jan 2024 16:51:31 +0100 Subject: [PATCH] [fast-avro] FastGenericDatumReader forwards setSchema() to coldDeserializer (#534) * TDD approach - adding unit test which should pass but it fails. * Some minor code cleanup. * [fast-avro][bugfix] Delegating setSchema() call to coldDeserializer from FastGenericDatumReader. It's needed to deserialize 1st record(s) from file using DataFileStream. --- .../src/test/avro/simpleTestRecord.avsc | 22 +++++ .../file/FastSerdeWithDataFileStreamTest.java | 88 +++++++++++++++++++ .../logical/types/LogicalTypesTestBase.java | 1 - .../avro/fastserde/FastDeserializer.java | 14 ++- .../fastserde/FastGenericDatumReader.java | 6 +- .../fastserde/FastGenericDatumWriter.java | 6 +- .../avro/fastserde/FastSerdeCache.java | 6 +- .../avro/fastserde/FastSerdeUtils.java | 18 ++-- .../compatibility/RandomRecordGenerator.java | 2 +- 9 files changed, 145 insertions(+), 18 deletions(-) create mode 100644 fastserde/avro-fastserde-tests-common/src/test/avro/simpleTestRecord.avsc create mode 100644 fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/file/FastSerdeWithDataFileStreamTest.java diff --git a/fastserde/avro-fastserde-tests-common/src/test/avro/simpleTestRecord.avsc b/fastserde/avro-fastserde-tests-common/src/test/avro/simpleTestRecord.avsc new file mode 100644 index 000000000..65de811ed --- /dev/null +++ b/fastserde/avro-fastserde-tests-common/src/test/avro/simpleTestRecord.avsc @@ -0,0 +1,22 @@ +{ + "type": "record", + "name": "SimpleTestRecord", + "namespace": "com.linkedin.avro.fastserde.generated.avro", + "doc": "Used in tests of fast-serde to verify writing records by DataFileWriter and reading by DataFileReader/DataFileStream", + "fields": [ + { + "name": "text", + "type": "string", + "default": "" + }, + { + "name": "fiveBytes", + "type": { + "name": "Fixed5", + "type": "fixed", + "size": 5 + }, + "default": "Fizyk" + } + ] +} diff --git a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/file/FastSerdeWithDataFileStreamTest.java b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/file/FastSerdeWithDataFileStreamTest.java new file mode 100644 index 000000000..63eed635d --- /dev/null +++ b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/file/FastSerdeWithDataFileStreamTest.java @@ -0,0 +1,88 @@ +package com.linkedin.avro.fastserde.file; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.io.DatumReader; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import com.linkedin.avro.fastserde.FastGenericDatumReader; +import com.linkedin.avro.fastserde.FastSpecificDatumReader; +import com.linkedin.avro.fastserde.FastSpecificDatumWriter; +import com.linkedin.avro.fastserde.generated.avro.Fixed5; +import com.linkedin.avro.fastserde.generated.avro.SimpleTestRecord; +import com.linkedin.avroutil1.compatibility.AvroRecordUtil; + +public class FastSerdeWithDataFileStreamTest { + + @DataProvider + private Object[][] dataFileStreamDeserializationTestCases() { + Schema readerSchema = SimpleTestRecord.SCHEMA$; + return new Object[][]{ + new Object[]{11, new FastSpecificDatumReader<>(null, readerSchema)}, + new Object[]{12, new FastGenericDatumReader(null, readerSchema)}, + }; + } + + @Test(groups = "deserializationTest", dataProvider = "dataFileStreamDeserializationTestCases") + void dataFileStreamShouldReadDataUsingSpecificReader(int recordsToWrite, + DatumReader datumReader) throws IOException { + // given: records to be written to one file + List records = new ArrayList<>(recordsToWrite); + for (byte i = 0; i < recordsToWrite; i++) { + Fixed5 fiveBytes = new Fixed5(); + fiveBytes.bytes(new byte[]{'K', 'r', 'i', 's', i}); + + SimpleTestRecord simpleTestRecord = new SimpleTestRecord(); + AvroRecordUtil.setField(simpleTestRecord, "fiveBytes", fiveBytes); + AvroRecordUtil.setField(simpleTestRecord, "text", "text-" + i); + + records.add(simpleTestRecord); + } + + // given: bytes array representing content of persistent file with schema and multiple records + byte[] bytes = writeTestRecordsToFile(records); + + // when: pre-populated bytes array is consumed by DataFileStream (in tests more convenient than DataFileReader + // because SeekableByteArrayInput is not available for older Avro versions) + ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); + DataFileStream dataFileStream = new DataFileStream<>(inputStream, datumReader); + + // then: records read from file are the same as records sent to file + int idx = 0; + for (IndexedRecord recordReadFromFile : dataFileStream) { + Assert.assertEquals(recordReadFromFile.toString(), records.get(idx++).toString()); + } + } + + /** + * @return bytes array representing file content + */ + private static byte[] writeTestRecordsToFile(List records) throws IOException { + Schema schema = SimpleTestRecord.SCHEMA$; + FastSpecificDatumWriter datumWriter = new FastSpecificDatumWriter<>(schema); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + try (DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter)) { + dataFileWriter.create(schema, outputStream); + + for (SimpleTestRecord record : records) { + dataFileWriter.append(record); + } + + dataFileWriter.flush(); + } + + return outputStream.toByteArray(); + } +} diff --git a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/logical/types/LogicalTypesTestBase.java b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/logical/types/LogicalTypesTestBase.java index 86d3cce70..a4fc04058 100644 --- a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/logical/types/LogicalTypesTestBase.java +++ b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/logical/types/LogicalTypesTestBase.java @@ -42,7 +42,6 @@ import com.linkedin.avro.fastserde.FastDeserializer; import com.linkedin.avro.fastserde.FastGenericDeserializerGenerator; import com.linkedin.avro.fastserde.FastGenericSerializerGenerator; -import com.linkedin.avro.fastserde.FastSerdeCache; import com.linkedin.avro.fastserde.FastSerializer; import com.linkedin.avro.fastserde.FastSpecificDeserializerGenerator; import com.linkedin.avro.fastserde.FastSpecificSerializerGenerator; diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializer.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializer.java index 3d06bee27..f7f4097c0 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializer.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializer.java @@ -2,8 +2,8 @@ import com.linkedin.avro.fastserde.customized.DatumReaderCustomization; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; + +import org.apache.avro.Schema; import org.apache.avro.io.Decoder; import static com.linkedin.avro.fastserde.customized.DatumReaderCustomization.*; @@ -19,5 +19,15 @@ default T deserialize(T reuse, Decoder d) throws IOException { return deserialize(reuse, d, DEFAULT_DATUM_READER_CUSTOMIZATION); } + /** + * Set the writer's schema. + * @see org.apache.avro.io.DatumReader#setSchema(Schema) + */ + default void setSchema(Schema writerSchema) { + // Implement this method only in vanilla-avro-based classes (e.g. fallback scenario). + // Normally for generated deserializers it doesn't make sense. + throw new UnsupportedOperationException("Can't change schema for already generated class."); + } + T deserialize(T reuse, Decoder d, DatumReaderCustomization customization) throws IOException; } diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumReader.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumReader.java index efe3974ee..213d07766 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumReader.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumReader.java @@ -87,6 +87,8 @@ public void setSchema(Schema schema) { if (readerSchema == null) { readerSchema = writerSchema; } + + coldDeserializer.setSchema(schema); } @Override @@ -99,7 +101,7 @@ public T read(T reuse, Decoder in) throws IOException { fastDeserializer = getFastDeserializerFromCache(cache, writerSchema, readerSchema, modelData, customization); if (fastDeserializer.hasDynamicClassGenerationDone()) { if (fastDeserializer.isBackedByGeneratedClass()) { - /** + /* * Runtime class generation is done successfully, so cache it. */ cachedFastDeserializer.compareAndSet(null, fastDeserializer); @@ -108,7 +110,7 @@ public T read(T reuse, Decoder in) throws IOException { + readerSchema + "], writer schema: [" + writerSchema + "]"); } } else { - /** + /* * Runtime class generation fails, so this class will cache a newly generated cold deserializer, which will * honer {@link FastSerdeCache#isFailFast()}. */ diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumWriter.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumWriter.java index eb467a728..56228b188 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumWriter.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumWriter.java @@ -76,7 +76,7 @@ public void write(T data, Encoder out) throws IOException { if (fastSerializer.hasDynamicClassGenerationDone()) { if (fastSerializer.isBackedByGeneratedClass()) { - /** + /* * Runtime class generation is done successfully, so cache it. */ cachedFastSerializer = fastSerializer; @@ -84,7 +84,7 @@ public void write(T data, Encoder out) throws IOException { LOGGER.debug("FastSerializer has been generated and cached for writer schema: [" + writerSchema + "]"); } } else { - /** + /* * Runtime class generation fails, so this class will cache a newly generated cold deserializer, which will * honer {@link FastSerdeCache#isFailFast()}. */ @@ -95,7 +95,7 @@ public void write(T data, Encoder out) throws IOException { } fastSerializer = cachedFastSerializer; } else { - /** + /* * Don't use the cached serializer since it may not support the passed customization. */ fastSerializer = coldSerializer; diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeCache.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeCache.java index 992049648..29503d54d 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeCache.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeCache.java @@ -183,7 +183,7 @@ public static boolean isSupportedForFastSerializer(Schema.Type schemaType) { Schema.Type.ARRAY); } - public static boolean isFastDeserializer(FastDeserializer deserializer) { + public static boolean isFastDeserializer(FastDeserializer deserializer) { return deserializer.isBackedByGeneratedClass(); } @@ -476,7 +476,7 @@ private FastDeserializer buildSpecificDeserializer(Schema writerSchema, Schem LOGGER.error("Deserializer class instantiation exception", e); } - return new FastSerdeUtils.FastDeserializerWithAvroSpecificImpl(writerSchema, readerSchema, modelData, customization, failFast, true); + return new FastSerdeUtils.FastDeserializerWithAvroSpecificImpl<>(writerSchema, readerSchema, modelData, customization, failFast, true); } /** @@ -536,7 +536,7 @@ private FastDeserializer buildGenericDeserializer(Schema writerSchema, Schema LOGGER.error("Deserializer class instantiation exception:", e); } - return new FastSerdeUtils.FastDeserializerWithAvroGenericImpl(writerSchema, readerSchema, modelData, customization, failFast, true); + return new FastSerdeUtils.FastDeserializerWithAvroGenericImpl<>(writerSchema, readerSchema, modelData, customization, failFast, true); } public FastSerializer buildFastSpecificSerializer(Schema schema, SpecificData modelData) { diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeUtils.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeUtils.java index ba590bd6e..333ac419b 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeUtils.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeUtils.java @@ -8,14 +8,10 @@ import org.apache.avro.generic.CustomizedSpecificDatumWriter; import com.linkedin.avro.fastserde.customized.DatumReaderCustomization; import com.linkedin.avro.fastserde.customized.DatumWriterCustomization; -import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import java.io.IOException; import org.apache.avro.Schema; -import org.apache.avro.generic.ColdGenericDatumReader; -import org.apache.avro.generic.ColdSpecificDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Decoder; import org.apache.avro.io.Encoder; import org.apache.avro.specific.SpecificData; @@ -50,12 +46,17 @@ public FastDeserializerWithAvroSpecificImpl(Schema writerSchema, Schema readerSc SpecificData modelData, DatumReaderCustomization customization, boolean failFast, boolean runtimeClassGenerationDone) { this.customization = customization == null ? DatumReaderCustomization.DEFAULT_DATUM_READER_CUSTOMIZATION : customization; this.customizedDatumReader = Utils.isAvro14() ? - new CustomizedSpecificDatumReaderForAvro14(writerSchema, readerSchema, this.customization) : + new CustomizedSpecificDatumReaderForAvro14<>(writerSchema, readerSchema, this.customization) : new CustomizedSpecificDatumReader<>(writerSchema, readerSchema, modelData, this.customization); this.failFast = failFast; this.runtimeClassGenerationDone = runtimeClassGenerationDone; } + @Override + public void setSchema(Schema writerSchema) { + this.customizedDatumReader.setSchema(writerSchema); + } + @Override public V deserialize(V reuse, Decoder d, DatumReaderCustomization customization) throws IOException { if (failFast) { @@ -100,13 +101,18 @@ public FastDeserializerWithAvroGenericImpl(Schema writerSchema, Schema readerSch GenericData modelData, DatumReaderCustomization customization, boolean failFast, boolean runtimeClassGenerationDone) { this.customization = customization == null ? DatumReaderCustomization.DEFAULT_DATUM_READER_CUSTOMIZATION : customization; this.customizedDatumReader = Utils.isAvro14() ? - new CustomizedGenericDatumReaderForAvro14(writerSchema, readerSchema, this.customization) : + new CustomizedGenericDatumReaderForAvro14<>(writerSchema, readerSchema, this.customization) : new CustomizedGenericDatumReader<>(writerSchema, readerSchema, modelData, this.customization); this.failFast = failFast; this.runtimeClassGenerationDone = runtimeClassGenerationDone; } + @Override + public void setSchema(Schema writerSchema) { + customizedDatumReader.setSchema(writerSchema); + } + @Override public V deserialize(V reuse, Decoder d, DatumReaderCustomization customization) throws IOException { if (failFast) { diff --git a/helper/helper/src/main/java/com/linkedin/avroutil1/compatibility/RandomRecordGenerator.java b/helper/helper/src/main/java/com/linkedin/avroutil1/compatibility/RandomRecordGenerator.java index d6eb624f4..ed4259399 100644 --- a/helper/helper/src/main/java/com/linkedin/avroutil1/compatibility/RandomRecordGenerator.java +++ b/helper/helper/src/main/java/com/linkedin/avroutil1/compatibility/RandomRecordGenerator.java @@ -360,7 +360,7 @@ private T instantiate(Class clazz, Schema of) { //TODO - look for both old and new SchemaConstructable ctrs 1st try { Constructor noArgCtr = clazz.getDeclaredConstructor(NO_ARGS); - return noArgCtr.newInstance(NO_ARGS); + return noArgCtr.newInstance(); } catch (Exception e) { throw new IllegalStateException("while trying to instantiate a(n) " + clazz.getName(), e); }