From 511db865f95965b67c254e9ac959541c03b18b9c Mon Sep 17 00:00:00 2001 From: "Michael G. Noll" Date: Mon, 11 Aug 2014 11:11:21 +0200 Subject: [PATCH] Add compression support to bijection-avro --- .../twitter/bijection/avro/AvroCodecs.scala | 108 +++++++++++++++++- .../avro/GenericAvroCodecsSpecification.scala | 40 +++++++ .../SpecificAvroCodecsSpecification.scala | 40 +++++++ 3 files changed, 185 insertions(+), 3 deletions(-) diff --git a/bijection-avro/src/main/scala/com/twitter/bijection/avro/AvroCodecs.scala b/bijection-avro/src/main/scala/com/twitter/bijection/avro/AvroCodecs.scala index c73fe88dd..54e70e1c0 100644 --- a/bijection-avro/src/main/scala/com/twitter/bijection/avro/AvroCodecs.scala +++ b/bijection-avro/src/main/scala/com/twitter/bijection/avro/AvroCodecs.scala @@ -16,7 +16,7 @@ package com.twitter.bijection.avro import com.twitter.bijection.Injection import org.apache.avro.specific.{ SpecificDatumReader, SpecificDatumWriter, SpecificRecordBase } -import org.apache.avro.file.{ DataFileStream, DataFileWriter } +import org.apache.avro.file.{ CodecFactory, DataFileStream, DataFileWriter } import java.io.{ ByteArrayInputStream, ByteArrayOutputStream } import com.twitter.bijection.Inversion.attempt import com.twitter.bijection.Attempt @@ -42,6 +42,59 @@ object SpecificAvroCodecs { new SpecificAvroCodec[T](klass) } + /** + * Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and + * SpecificDatumReader. Data is compressed with the provided codec. + * @param codecFactory codec with which the data is being compressed + * @tparam T compiled Avro record + * @return Injection + */ + def withCompression[T <: SpecificRecordBase: Manifest](codecFactory: CodecFactory): Injection[T, Array[Byte]] = { + val klass = manifest[T].erasure.asInstanceOf[Class[T]] + new SpecificAvroCodec[T](klass, Some(codecFactory)) + } + + /** + * Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and + * SpecificDatumReader. Data is compressed with the Bzip2 codec. + * @tparam T compiled Avro record + * @return Injection + */ + def withBzip2Compression[T <: SpecificRecordBase: Manifest]: Injection[T, Array[Byte]] = + withCompression(CodecFactory.bzip2Codec()) + + /** + * Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and + * SpecificDatumReader. Data is compressed with the Deflate codec. + * @param compressionLevel Compression level should be between 1 and 9, inclusive. Higher values result in better + * compression at the expense of encoding speed. + * @tparam T compiled Avro record + * @return Injection + */ + def withDeflateCompression[T <: SpecificRecordBase: Manifest](compressionLevel: Int): Injection[T, Array[Byte]] = { + require(1 <= compressionLevel && compressionLevel <= 9, "Compression level should be between 1 and 9, inclusive") + withCompression(CodecFactory.deflateCodec(compressionLevel)) + } + + /** + * Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and + * SpecificDatumReader. Data is compressed with the Deflate codec and a default compression level of 5. + * @tparam T compiled Avro record + * @return Injection + */ + // Allows to create deflate-compressing Injection's without requiring parentheses similar to `apply`, + // `withSnappyCompression`, etc. to achieve API consistency. + def withDeflateCompression[T <: SpecificRecordBase: Manifest]: Injection[T, Array[Byte]] = withDeflateCompression(5) + + /** + * Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and + * SpecificDatumReader. Data is compressed with the Snappy codec. + * @tparam T compiled Avro record + * @return Injection + */ + def withSnappyCompression[T <: SpecificRecordBase: Manifest]: Injection[T, Array[Byte]] = + withCompression(CodecFactory.snappyCodec()) + /** * Returns Injection capable of serializing and deserializing a compiled avro record using org.apache.avro.io.BinaryEncoder * @tparam T compiled Avro record @@ -79,6 +132,47 @@ object GenericAvroCodecs { new GenericAvroCodec[T](schema) } + /** + * Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and + * SpecificDatumReader. Data is compressed with the provided codec. + * @param codecFactory codec with which the data is being compressed + * @tparam T generic record + * @return Injection + */ + def withCompression[T <: GenericRecord: Manifest](schema: Schema, codecFactory: CodecFactory): Injection[T, Array[Byte]] = + new GenericAvroCodec[T](schema, Some(codecFactory)) + + /** + * Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and + * SpecificDatumReader. Data is compressed with the Bzip2 codec. + * @tparam T generic record + * @return Injection + */ + def withBzip2Compression[T <: GenericRecord: Manifest](schema: Schema): Injection[T, Array[Byte]] = + withCompression(schema, CodecFactory.bzip2Codec()) + + /** + * Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and + * SpecificDatumReader. Data is compressed with the Deflate codec. + * @param compressionLevel Compression level should be between 1 and 9, inclusive. Higher values result in better + * compression at the expense of encoding speed. Default compression level is 5. + * @tparam T generic record + * @return Injection + */ + def withDeflateCompression[T <: GenericRecord: Manifest](schema: Schema, compressionLevel: Int = 5): Injection[T, Array[Byte]] = { + require(1 <= compressionLevel && compressionLevel <= 9, "Compression level should be between 1 and 9, inclusive") + withCompression(schema, CodecFactory.deflateCodec(compressionLevel)) + } + + /** + * Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and + * SpecificDatumReader. Data is compressed with the Snappy codec. + * @tparam T generic record + * @return Injection + */ + def withSnappyCompression[T <: GenericRecord: Manifest](schema: Schema): Injection[T, Array[Byte]] = + withCompression(schema, CodecFactory.snappyCodec()) + /** * Returns Injection capable of serializing and deserializing a generic avro record using org.apache.avro.io.BinaryEncoder * @tparam T GenericRecord @@ -108,10 +202,14 @@ object GenericAvroCodecs { * @param klass class of complied record * @tparam T compiled record */ -class SpecificAvroCodec[T <: SpecificRecordBase](klass: Class[T]) extends Injection[T, Array[Byte]] { +class SpecificAvroCodec[T <: SpecificRecordBase](klass: Class[T], codecFactory: Option[CodecFactory] = None) extends Injection[T, Array[Byte]] { def apply(a: T): Array[Byte] = { val writer = new SpecificDatumWriter[T](a.getSchema) val fileWriter = new DataFileWriter[T](writer) + codecFactory match { + case Some(cf) => fileWriter.setCodec(cf) + case None => + } val stream = new ByteArrayOutputStream() fileWriter.create(a.getSchema, stream) fileWriter.append(a) @@ -134,10 +232,14 @@ class SpecificAvroCodec[T <: SpecificRecordBase](klass: Class[T]) extends Inject * @param schema avro schema * @tparam T generic record */ -class GenericAvroCodec[T <: GenericRecord](schema: Schema) extends Injection[T, Array[Byte]] { +class GenericAvroCodec[T <: GenericRecord](schema: Schema, codecFactory: Option[CodecFactory] = None) extends Injection[T, Array[Byte]] { def apply(a: T): Array[Byte] = { val writer = new GenericDatumWriter[T](a.getSchema) val fileWriter = new DataFileWriter[T](writer) + codecFactory match { + case Some(cf) => fileWriter.setCodec(cf) + case None => + } val stream = new ByteArrayOutputStream() fileWriter.create(a.getSchema, stream) fileWriter.append(a) diff --git a/bijection-avro/src/test/scala/com/twitter/bijection/avro/GenericAvroCodecsSpecification.scala b/bijection-avro/src/test/scala/com/twitter/bijection/avro/GenericAvroCodecsSpecification.scala index da22c28d4..23257b492 100644 --- a/bijection-avro/src/test/scala/com/twitter/bijection/avro/GenericAvroCodecsSpecification.scala +++ b/bijection-avro/src/test/scala/com/twitter/bijection/avro/GenericAvroCodecsSpecification.scala @@ -61,6 +61,46 @@ object GenericAvroCodecsSpecification extends Specification with BaseProperties attempt.get must_== testRecord } + "Round trip generic record using Generic Injection with Bzip2 compression" in { + implicit val genericInjection = GenericAvroCodecs.withBzip2Compression[GenericRecord](testSchema) + val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12)) + val bytes = Injection[GenericRecord, Array[Byte]](testRecord) + val attempt = Injection.invert[GenericRecord, Array[Byte]](bytes) + attempt.get must_== testRecord + } + + "Round trip generic record using Generic Injection with Deflate compression (default compression level)" in { + implicit val genericInjection = GenericAvroCodecs.withDeflateCompression[GenericRecord](testSchema) + val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12)) + val bytes = Injection[GenericRecord, Array[Byte]](testRecord) + val attempt = Injection.invert[GenericRecord, Array[Byte]](bytes) + attempt.get must_== testRecord + } + + "Round trip generic record using Generic Injection with Deflate compression (custom compression level)" in { + implicit val genericInjection = GenericAvroCodecs.withDeflateCompression[GenericRecord](testSchema, 9) + val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12)) + val bytes = Injection[GenericRecord, Array[Byte]](testRecord) + val attempt = Injection.invert[GenericRecord, Array[Byte]](bytes) + attempt.get must_== testRecord + } + + "Cannot create Generic Injection with Deflate compression if compression level is set too low" in { + GenericAvroCodecs.withDeflateCompression[FiscalRecord](testSchema, 0) must throwA[IllegalArgumentException] + } + + "Cannot create Generic Injection with Deflate compression if compression level is set too high" in { + GenericAvroCodecs.withDeflateCompression[FiscalRecord](testSchema, 10) must throwA[IllegalArgumentException] + } + + "Round trip generic record using Generic Injection with Snappy compression" in { + implicit val genericInjection = GenericAvroCodecs.withSnappyCompression[GenericRecord](testSchema) + val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12)) + val bytes = Injection[GenericRecord, Array[Byte]](testRecord) + val attempt = Injection.invert[GenericRecord, Array[Byte]](bytes) + attempt.get must_== testRecord + } + "Round trip generic record using Binary Injection" in { implicit val genericBinaryInjection = GenericAvroCodecs.toBinary[GenericRecord](testSchema) val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12)) diff --git a/bijection-avro/src/test/scala/com/twitter/bijection/avro/SpecificAvroCodecsSpecification.scala b/bijection-avro/src/test/scala/com/twitter/bijection/avro/SpecificAvroCodecsSpecification.scala index 01ec30b63..18b01622e 100644 --- a/bijection-avro/src/test/scala/com/twitter/bijection/avro/SpecificAvroCodecsSpecification.scala +++ b/bijection-avro/src/test/scala/com/twitter/bijection/avro/SpecificAvroCodecsSpecification.scala @@ -46,6 +46,46 @@ object SpecificAvroCodecsSpecification extends Specification with BaseProperties attempt.get must_== testRecord } + "Round trip specific record using Specific Injection with Bzip2 compression" in { + implicit val specificInjection = SpecificAvroCodecs.withBzip2Compression[FiscalRecord] + val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12)) + val bytes = Injection[FiscalRecord, Array[Byte]](testRecord) + val attempt = Injection.invert[FiscalRecord, Array[Byte]](bytes) + attempt.get must_== testRecord + } + + "Round trip specific record using Specific Injection with Deflate compression (default compression level)" in { + implicit val specificInjection = SpecificAvroCodecs.withDeflateCompression[FiscalRecord] + val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12)) + val bytes = Injection[FiscalRecord, Array[Byte]](testRecord) + val attempt = Injection.invert[FiscalRecord, Array[Byte]](bytes) + attempt.get must_== testRecord + } + + "Round trip specific record using Specific Injection with Deflate compression (custom compression level)" in { + implicit val specificInjection = SpecificAvroCodecs.withDeflateCompression[FiscalRecord](9) + val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12)) + val bytes = Injection[FiscalRecord, Array[Byte]](testRecord) + val attempt = Injection.invert[FiscalRecord, Array[Byte]](bytes) + attempt.get must_== testRecord + } + + "Cannot create Specific Injection with Deflate compression if compression level is set too low" in { + SpecificAvroCodecs.withDeflateCompression[FiscalRecord](0) must throwA[IllegalArgumentException] + } + + "Cannot create Specific Injection with Deflate compression if compression level is set too high" in { + SpecificAvroCodecs.withDeflateCompression[FiscalRecord](10) must throwA[IllegalArgumentException] + } + + "Round trip specific record using Specific Injection with Snappy compression" in { + implicit val specificInjection = SpecificAvroCodecs.withSnappyCompression[FiscalRecord] + val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12)) + val bytes = Injection[FiscalRecord, Array[Byte]](testRecord) + val attempt = Injection.invert[FiscalRecord, Array[Byte]](bytes) + attempt.get must_== testRecord + } + "Round trip specific record using Binary Injection" in { implicit val specificBinaryInjection = SpecificAvroCodecs.toBinary[FiscalRecord] val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12))