From 5afb3e87c2703b854070715cd3358a57140f51a4 Mon Sep 17 00:00:00 2001 From: Mansur Ashraf Date: Sat, 5 Oct 2013 20:42:38 -0500 Subject: [PATCH 1/3] upgraded to Avro 1.7.5. Created separate objects for Specific and Generic codecs to accommodate class hierarchy change in 1.7.5 --- .../twitter/bijection/avro/AvroCodecs.scala | 76 +++++++------------ ...cLaws.scala => GenericAvroCodecLaws.scala} | 40 +--------- ...a => GenericAvroCodecsSpecification.scala} | 46 ++--------- .../avro/SpecificAvroCodecLaws.scala | 71 +++++++++++++++++ .../SpecificAvroCodecsSpecification.scala | 74 ++++++++++++++++++ project/Build.scala | 2 +- 6 files changed, 185 insertions(+), 124 deletions(-) rename bijection-avro/src/test/scala/com/twitter/bijection/avro/{AvroCodecLaws.scala => GenericAvroCodecLaws.scala} (70%) rename bijection-avro/src/test/scala/com/twitter/bijection/avro/{AvroCodecsSpecification.scala => GenericAvroCodecsSpecification.scala} (65%) create mode 100644 bijection-avro/src/test/scala/com/twitter/bijection/avro/SpecificAvroCodecLaws.scala create mode 100644 bijection-avro/src/test/scala/com/twitter/bijection/avro/SpecificAvroCodecsSpecification.scala diff --git a/bijection-avro/src/main/scala/com/twitter/bijection/avro/AvroCodecs.scala b/bijection-avro/src/main/scala/com/twitter/bijection/avro/AvroCodecs.scala index a4641196f..e548c0fd5 100644 --- a/bijection-avro/src/main/scala/com/twitter/bijection/avro/AvroCodecs.scala +++ b/bijection-avro/src/main/scala/com/twitter/bijection/avro/AvroCodecs.scala @@ -17,19 +17,20 @@ package com.twitter.bijection.avro import com.twitter.bijection.Injection import org.apache.avro.specific.{SpecificDatumReader, SpecificDatumWriter, SpecificRecordBase} import org.apache.avro.file.{DataFileStream, DataFileWriter} -import java.io.{UnsupportedEncodingException, ByteArrayInputStream, ByteArrayOutputStream} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import com.twitter.bijection.Inversion.attempt import com.twitter.bijection.Attempt import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord} import org.apache.avro.Schema import org.apache.avro.io.{DecoderFactory, DatumReader, EncoderFactory, DatumWriter} +import Injection.utf8 /** * Factory providing various avro injections. * @author Muhammad Ashraf * @since 7/4/13 */ -object AvroCodecs { +object SpecificAvroCodecs { /** * Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and * SpecificDatumReader @@ -41,27 +42,40 @@ object AvroCodecs { new SpecificAvroCodec[T](klass) } - /** - * Returns Injection capable of serializing and deserializing a generic record using GenericDatumReader and - * GenericDatumReader - * @tparam T generic record + * Returns Injection capable of serializing and deserializing a compiled avro record using org.apache.avro.io.BinaryEncoder + * @tparam T compiled Avro record * @return Injection */ - def apply[T <: GenericRecord](schema: Schema): Injection[T, Array[Byte]] = { - new GenericAvroCodec[T](schema) + def toBinary[T <: SpecificRecordBase : Manifest]: Injection[T, Array[Byte]] = { + val klass = manifest[T].erasure.asInstanceOf[Class[T]] + val writer = new SpecificDatumWriter[T](klass) + val reader = new SpecificDatumReader[T](klass) + new BinaryAvroCodec[T](writer, reader) } /** - * Returns Injection capable of serializing and deserializing a compiled avro record using org.apache.avro.io.BinaryEncoder + * Returns Injection capable of serializing and deserializing a generic avro record using org.apache.avro.io.JsonEncoder * @tparam T compiled Avro record * @return Injection */ - def toBinary[T <: SpecificRecordBase : Manifest]: Injection[T, Array[Byte]] = { + def toJson[T <: SpecificRecordBase : Manifest](schema: Schema): Injection[T, String] = { val klass = manifest[T].erasure.asInstanceOf[Class[T]] val writer = new SpecificDatumWriter[T](klass) val reader = new SpecificDatumReader[T](klass) - new BinaryAvroCodec[T](writer, reader) + new JsonAvroCodec[T](schema, writer, reader) + } +} + +object GenericAvroCodecs { + /** + * Returns Injection capable of serializing and deserializing a generic record using GenericDatumReader and + * GenericDatumReader + * @tparam T generic record + * @return Injection + */ + def apply[T <: GenericRecord](schema: Schema): Injection[T, Array[Byte]] = { + new GenericAvroCodec[T](schema) } /** @@ -85,18 +99,6 @@ object AvroCodecs { val reader = new GenericDatumReader[T](schema) new JsonAvroCodec[T](schema, writer, reader) } - - /** - * Returns Injection capable of serializing and deserializing a generic avro record using org.apache.avro.io.JsonEncoder - * @tparam T compiled Avro record - * @return Injection - */ - def toJson[T <: SpecificRecordBase : Manifest](schema: Schema): Injection[T, String] = { - val klass = manifest[T].erasure.asInstanceOf[Class[T]] - val writer = new SpecificDatumWriter[T](klass) - val reader = new SpecificDatumReader[T](klass) - new JsonAvroCodec[T](schema, writer, reader) - } } /** @@ -185,35 +187,13 @@ class JsonAvroCodec[T](schema: Schema, writer: DatumWriter[T], reader: DatumRead val encoder = EncoderFactory.get().jsonEncoder(schema, stream) writer.write(a, encoder) encoder.flush() - newUtf8(stream.toByteArray) + Injection.invert[String, Array[Byte]](stream.toByteArray).get } def invert(str: String): Attempt[T] = attempt(str) { str => - val decoder = DecoderFactory.get().jsonDecoder(schema, new ByteArrayInputStream(getBytesUtf8(str))) + val decoder = DecoderFactory.get().jsonDecoder(schema, new ByteArrayInputStream(Injection[String, Array[Byte]](str))) reader.read(null.asInstanceOf[T], decoder) } - - private def newUtf8(bytes: Array[Byte]): String = { - try { - if (bytes == null) null else new String(bytes, "UTF-8") - } - catch { - case uee: UnsupportedEncodingException => { - throw new RuntimeException("UTF-8 Not supported on this platform") - } - } - } - - private def getBytesUtf8(string: String): Array[Byte] = { - try { - if (string == null) null else string.getBytes("UTF-8") - } - catch { - case uee: UnsupportedEncodingException => { - throw new RuntimeException("UTF-8 Not supported on this platform") - } - } - } - } + diff --git a/bijection-avro/src/test/scala/com/twitter/bijection/avro/AvroCodecLaws.scala b/bijection-avro/src/test/scala/com/twitter/bijection/avro/GenericAvroCodecLaws.scala similarity index 70% rename from bijection-avro/src/test/scala/com/twitter/bijection/avro/AvroCodecLaws.scala rename to bijection-avro/src/test/scala/com/twitter/bijection/avro/GenericAvroCodecLaws.scala index 40c6214c0..ed4b58d2c 100644 --- a/bijection-avro/src/test/scala/com/twitter/bijection/avro/AvroCodecLaws.scala +++ b/bijection-avro/src/test/scala/com/twitter/bijection/avro/GenericAvroCodecLaws.scala @@ -18,13 +18,12 @@ import com.twitter.bijection.{BaseProperties, Injection} import org.scalacheck.Properties import org.apache.avro.generic.{GenericRecord, GenericData} import org.apache.avro.Schema -import avro.FiscalRecord /** * @author Muhammad Ashraf * @since 7/5/13 */ -object AvroCodecLaws extends Properties("AvroCodecs") with BaseProperties { +object GenericAvroCodecLaws extends Properties("GenericAvroCodecs") with BaseProperties { val testSchema = new Schema.Parser().parse( """{ "type":"record", "name":"FiscalRecord", @@ -51,15 +50,6 @@ object AvroCodecLaws extends Properties("AvroCodecs") with BaseProperties { ] }""") - - def buildSpecificAvroRecord(i: (String, Int, Int)): FiscalRecord = { - FiscalRecord.newBuilder() - .setCalendarDate(i._1) - .setFiscalWeek(i._2) - .setFiscalYear(i._3) - .build() - } - def buildGenericAvroRecord(i: (String, Int, Int)): GenericRecord = { val fiscalRecord = new GenericData.Record(testSchema) @@ -69,48 +59,26 @@ object AvroCodecLaws extends Properties("AvroCodecs") with BaseProperties { fiscalRecord } - implicit val testSpecificRecord = arbitraryViaFn { - is: (String, Int, Int) => buildSpecificAvroRecord(is) - } - implicit val testGenericRecord = arbitraryViaFn { is: (String, Int, Int) => buildGenericAvroRecord(is) } - def roundTripsSpecificRecord(implicit injection: Injection[FiscalRecord, Array[Byte]]) = { - isLooseInjection[FiscalRecord, Array[Byte]] - } - def roundTripsGenericRecord(implicit injection: Injection[GenericRecord, Array[Byte]]) = { isLooseInjection[GenericRecord, Array[Byte]] } - def roundTripsSpecificRecordToJson(implicit injection: Injection[FiscalRecord, String]) = { - isLooseInjection[FiscalRecord, String] - } - def roundTripsGenericRecordToJson(implicit injection: Injection[GenericRecord, String]) = { isLooseInjection[GenericRecord, String] } - property("round trips Specific Record -> Array[Byte]") = - roundTripsSpecificRecord(AvroCodecs[FiscalRecord]) - property("round trips Generic Record -> Array[Byte]") = - roundTripsGenericRecord(AvroCodecs[GenericRecord](testSchema)) - - property("round trips Specific Record -> Array[Byte] using Binary Encoder/Decoder") = - roundTripsSpecificRecord(AvroCodecs.toBinary[FiscalRecord]) + roundTripsGenericRecord(GenericAvroCodecs[GenericRecord](testSchema)) property("round trips Generic Record -> Array[Byte] using Binary Encoder/Decoder") = - roundTripsGenericRecord(AvroCodecs.toBinary[GenericRecord](testSchema)) + roundTripsGenericRecord(GenericAvroCodecs.toBinary[GenericRecord](testSchema)) property("round trips Generic Record -> String using Json Encoder/Decoder") = - roundTripsGenericRecordToJson(AvroCodecs.toJson[GenericRecord](testSchema)) - - property("round trips Specific Record -> String using Json Encoder/Decoder") = - roundTripsSpecificRecordToJson(AvroCodecs.toJson[FiscalRecord](testSchema)) - + roundTripsGenericRecordToJson(GenericAvroCodecs.toJson[GenericRecord](testSchema)) } diff --git a/bijection-avro/src/test/scala/com/twitter/bijection/avro/AvroCodecsSpecification.scala b/bijection-avro/src/test/scala/com/twitter/bijection/avro/GenericAvroCodecsSpecification.scala similarity index 65% rename from bijection-avro/src/test/scala/com/twitter/bijection/avro/AvroCodecsSpecification.scala rename to bijection-avro/src/test/scala/com/twitter/bijection/avro/GenericAvroCodecsSpecification.scala index de0c41a07..b500c6259 100644 --- a/bijection-avro/src/test/scala/com/twitter/bijection/avro/AvroCodecsSpecification.scala +++ b/bijection-avro/src/test/scala/com/twitter/bijection/avro/GenericAvroCodecsSpecification.scala @@ -24,7 +24,7 @@ import org.apache.avro.generic.{GenericData, GenericRecord} * @author Muhammad Ashraf * @since 7/6/13 */ -object AvroCodecsSpecification extends Specification with BaseProperties { +object GenericAvroCodecsSpecification extends Specification with BaseProperties { val testSchema = new Schema.Parser().parse( """{ "type":"record", "name":"FiscalRecord", @@ -52,50 +52,26 @@ object AvroCodecsSpecification extends Specification with BaseProperties { }""") - "Avro codec" should { - - "Round trip specific record using Specific Injection" in { - implicit val specificInjection = AvroCodecs[FiscalRecord] - val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12)) - val bytes = Injection[FiscalRecord, Array[Byte]](testRecord) - val attempt = Injection.invert[FiscalRecord, Array[Byte]](bytes) - attempt.get must_== testRecord - } - - "Round trip specific record using Binary Injection" in { - implicit val specificBinaryInjection = AvroCodecs.toBinary[FiscalRecord] - val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12)) - val bytes = Injection[FiscalRecord, Array[Byte]](testRecord) - val attempt = Injection.invert[FiscalRecord, Array[Byte]](bytes) - attempt.get must_== testRecord - } - - "Round trip specific record using Json Injection" in { - implicit val specificJsonInjection = AvroCodecs.toJson[FiscalRecord](testSchema) - val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12)) - val jsonString = Injection[FiscalRecord, String](testRecord) - val attempt = Injection.invert[FiscalRecord, String](jsonString) - attempt.get must_== testRecord - } + "Generic Avro codec" should { "Round trip generic record using Generic Injection" in { - implicit val genericInjection = AvroCodecs[GenericRecord](testSchema) + implicit val genericInjection = GenericAvroCodecs[GenericRecord](testSchema) val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12)) val bytes = Injection[GenericRecord, Array[Byte]](testRecord) val attempt = Injection.invert[GenericRecord, Array[Byte]](bytes) attempt.get must_== testRecord } - "Round trip specific record using Binary Injection" in { - implicit val genericBinaryInjection = AvroCodecs.toBinary[GenericRecord](testSchema) + "Round trip generic record using Binary Injection" in { + implicit val genericBinaryInjection = GenericAvroCodecs.toBinary[GenericRecord](testSchema) val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12)) val bytes = Injection[GenericRecord, Array[Byte]](testRecord) val attempt = Injection.invert[GenericRecord, Array[Byte]](bytes) attempt.get must_== testRecord } - "Round trip specific record using Json Injection" in { - implicit val genericJsonInjection = AvroCodecs.toJson[GenericRecord](testSchema) + "Round trip generic record using Json Injection" in { + implicit val genericJsonInjection = GenericAvroCodecs.toJson[GenericRecord](testSchema) val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12)) val jsonString = Injection[GenericRecord, String](testRecord) val attempt = Injection.invert[GenericRecord, String](jsonString) @@ -103,14 +79,6 @@ object AvroCodecsSpecification extends Specification with BaseProperties { } } - def buildSpecificAvroRecord(i: (String, Int, Int)): FiscalRecord = { - FiscalRecord.newBuilder() - .setCalendarDate(i._1) - .setFiscalWeek(i._2) - .setFiscalYear(i._3) - .build() - } - def buildGenericAvroRecord(i: (String, Int, Int)): GenericRecord = { val fiscalRecord = new GenericData.Record(testSchema) diff --git a/bijection-avro/src/test/scala/com/twitter/bijection/avro/SpecificAvroCodecLaws.scala b/bijection-avro/src/test/scala/com/twitter/bijection/avro/SpecificAvroCodecLaws.scala new file mode 100644 index 000000000..78ddcee98 --- /dev/null +++ b/bijection-avro/src/test/scala/com/twitter/bijection/avro/SpecificAvroCodecLaws.scala @@ -0,0 +1,71 @@ +package com.twitter.bijection.avro + +import org.scalacheck.Properties +import com.twitter.bijection.{Injection, BaseProperties} +import org.apache.avro.Schema +import avro.FiscalRecord + +/** + * @author Muhammad Ashraf + * @since 10/5/13 + */ +object SpecificAvroCodecLaws extends Properties("SpecificAvroCodecs") with BaseProperties { + val testSchema = new Schema.Parser().parse( """{ + "type":"record", + "name":"FiscalRecord", + "namespace":"avro", + "fields":[ + { + "name":"calendarDate", + "type":"string" + }, + { + "name":"fiscalWeek", + "type":[ + "int", + "null" + ] + }, + { + "name":"fiscalYear", + "type":[ + "int", + "null" + ] + } + ] + }""") + + + def buildSpecificAvroRecord(i: (String, Int, Int)): FiscalRecord = { + FiscalRecord.newBuilder() + .setCalendarDate(i._1) + .setFiscalWeek(i._2) + .setFiscalYear(i._3) + .build() + } + + implicit val testSpecificRecord = arbitraryViaFn { + is: (String, Int, Int) => buildSpecificAvroRecord(is) + } + + def roundTripsSpecificRecord(implicit injection: Injection[FiscalRecord, Array[Byte]]) = { + isLooseInjection[FiscalRecord, Array[Byte]] + } + + def roundTripsSpecificRecordToJson(implicit injection: Injection[FiscalRecord, String]) = { + isLooseInjection[FiscalRecord, String] + } + + property("round trips Specific Record -> Array[Byte]") = + roundTripsSpecificRecord(SpecificAvroCodecs[FiscalRecord]) + + property("round trips Specific Record -> Array[Byte] using Binary Encoder/Decoder") = + roundTripsSpecificRecord(SpecificAvroCodecs.toBinary[FiscalRecord]) + + property("round trips Specific Record -> String using Json Encoder/Decoder") = + roundTripsSpecificRecordToJson(SpecificAvroCodecs.toJson[FiscalRecord](testSchema)) + +} + + diff --git a/bijection-avro/src/test/scala/com/twitter/bijection/avro/SpecificAvroCodecsSpecification.scala b/bijection-avro/src/test/scala/com/twitter/bijection/avro/SpecificAvroCodecsSpecification.scala new file mode 100644 index 000000000..11e18fae7 --- /dev/null +++ b/bijection-avro/src/test/scala/com/twitter/bijection/avro/SpecificAvroCodecsSpecification.scala @@ -0,0 +1,74 @@ +package com.twitter.bijection.avro + +import org.specs.Specification +import com.twitter.bijection.{Injection, BaseProperties} +import org.apache.avro.Schema +import avro.FiscalRecord + +/** + * @author Muhammad Ashraf + * @since 10/5/13 + */ +object SpecificAvroCodecsSpecification extends Specification with BaseProperties { + val testSchema = new Schema.Parser().parse( """{ + "type":"record", + "name":"FiscalRecord", + "namespace":"avro", + "fields":[ + { + "name":"calendarDate", + "type":"string" + }, + { + "name":"fiscalWeek", + "type":[ + "int", + "null" + ] + }, + { + "name":"fiscalYear", + "type":[ + "int", + "null" + ] + } + ] + }""") + + + "Avro codec" should { + + "Round trip specific record using Specific Injection" in { + implicit val specificInjection = SpecificAvroCodecs[FiscalRecord] + val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12)) + val bytes = Injection[FiscalRecord, Array[Byte]](testRecord) + val attempt = Injection.invert[FiscalRecord, Array[Byte]](bytes) + attempt.get must_== testRecord + } + + "Round trip specific record using Binary Injection" in { + implicit val specificBinaryInjection = SpecificAvroCodecs.toBinary[FiscalRecord] + val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12)) + val bytes = Injection[FiscalRecord, Array[Byte]](testRecord) + val attempt = Injection.invert[FiscalRecord, Array[Byte]](bytes) + attempt.get must_== testRecord + } + + "Round trip specific record using Json Injection" in { + implicit val specificJsonInjection = SpecificAvroCodecs.toJson[FiscalRecord](testSchema) + val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12)) + val jsonString = Injection[FiscalRecord, String](testRecord) + val attempt = Injection.invert[FiscalRecord, String](jsonString) + attempt.get must_== testRecord + } + } + + def buildSpecificAvroRecord(i: (String, Int, Int)): FiscalRecord = { + FiscalRecord.newBuilder() + .setCalendarDate(i._1) + .setFiscalWeek(i._2) + .setFiscalYear(i._3) + .build() + } +} diff --git a/project/Build.scala b/project/Build.scala index 4bfe25c10..b96cd515e 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -217,7 +217,7 @@ object BijectionBuild extends Build { lazy val bijectionAvro = module("avro").settings( osgiExportAll("com.twitter.bijection.avro"), libraryDependencies ++= Seq( - "org.apache.avro" % "avro" % "1.7.4" + "org.apache.avro" % "avro" % "1.7.5" ) ).dependsOn(bijectionCore % "test->test;compile->compile") From 1fc901ec92b5023d47721286e0ab6a151f7fec17 Mon Sep 17 00:00:00 2001 From: Mansur Ashraf Date: Tue, 8 Oct 2013 20:10:37 -0500 Subject: [PATCH 2/3] adds comments indicating usages of UTF-8 String in Avro Json Bijection --- .../com/twitter/bijection/avro/AvroCodecs.scala | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/bijection-avro/src/main/scala/com/twitter/bijection/avro/AvroCodecs.scala b/bijection-avro/src/main/scala/com/twitter/bijection/avro/AvroCodecs.scala index e548c0fd5..2e51aa737 100644 --- a/bijection-avro/src/main/scala/com/twitter/bijection/avro/AvroCodecs.scala +++ b/bijection-avro/src/main/scala/com/twitter/bijection/avro/AvroCodecs.scala @@ -55,7 +55,8 @@ object SpecificAvroCodecs { } /** - * Returns Injection capable of serializing and deserializing a generic avro record using org.apache.avro.io.JsonEncoder + * Returns Injection capable of serializing and deserializing a generic avro record using org.apache.avro.io.JsonEncoder to a + * UTF-8 String * @tparam T compiled Avro record * @return Injection */ @@ -89,6 +90,18 @@ object GenericAvroCodecs { new BinaryAvroCodec[T](writer, reader) } + /** + * Returns Injection capable of serializing and deserializing a generic avro record using org.apache.avro.io.JsonEncoder to a + * UTF-8 String + * @tparam T compiled Avro record + * @return Injection + */ + def toJson[T <: GenericRecord](schema: Schema): Injection[T, String] = { + val writer = new GenericDatumWriter[T](schema) + val reader = new GenericDatumReader[T](schema) + new JsonAvroCodec[T](schema, writer, reader) + } + /** * Returns Injection capable of serializing and deserializing a compiled avro record using org.apache.avro.io.JsonEncoder * @tparam T compiled Avro record @@ -177,9 +190,11 @@ class BinaryAvroCodec[T](writer: DatumWriter[T], reader: DatumReader[T]) extends /** * Provides methods to serializing and deserializing a generic and compiled avro record using org.apache.avro.io.JsonEncoder + * to a UTF-8 String * @param writer Datum writer * @param reader Datum reader * @tparam T avro record + * @throws RuntimeException if Avro Records cannot be converted to a UTF-8 String */ class JsonAvroCodec[T](schema: Schema, writer: DatumWriter[T], reader: DatumReader[T]) extends Injection[T, String] { def apply(a: T): String = { From af0d8f0e5997113f23413c784ca32bf7f3de81dc Mon Sep 17 00:00:00 2001 From: Mansur Ashraf Date: Wed, 9 Oct 2013 21:48:01 -0500 Subject: [PATCH 3/3] fixed broken build --- .../scala/com/twitter/bijection/avro/AvroCodecs.scala | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/bijection-avro/src/main/scala/com/twitter/bijection/avro/AvroCodecs.scala b/bijection-avro/src/main/scala/com/twitter/bijection/avro/AvroCodecs.scala index 2e51aa737..fa034d1af 100644 --- a/bijection-avro/src/main/scala/com/twitter/bijection/avro/AvroCodecs.scala +++ b/bijection-avro/src/main/scala/com/twitter/bijection/avro/AvroCodecs.scala @@ -101,17 +101,6 @@ object GenericAvroCodecs { val reader = new GenericDatumReader[T](schema) new JsonAvroCodec[T](schema, writer, reader) } - - /** - * Returns Injection capable of serializing and deserializing a compiled avro record using org.apache.avro.io.JsonEncoder - * @tparam T compiled Avro record - * @return Injection - */ - def toJson[T <: GenericRecord](schema: Schema): Injection[T, String] = { - val writer = new GenericDatumWriter[T](schema) - val reader = new GenericDatumReader[T](schema) - new JsonAvroCodec[T](schema, writer, reader) - } } /**