Skip to content

Commit

Permalink
Merge pull request #174 from miguno/feature/avro-compression
Browse files Browse the repository at this point in the history
Add compression support to bijection-avro
  • Loading branch information
johnynek committed Aug 14, 2014
2 parents d0d45ba + 511db86 commit f8c12ac
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package com.twitter.bijection.avro

import com.twitter.bijection.Injection
import org.apache.avro.specific.{ SpecificDatumReader, SpecificDatumWriter, SpecificRecordBase }
import org.apache.avro.file.{ DataFileStream, DataFileWriter }
import org.apache.avro.file.{ CodecFactory, DataFileStream, DataFileWriter }
import java.io.{ ByteArrayInputStream, ByteArrayOutputStream }
import com.twitter.bijection.Inversion.attempt
import com.twitter.bijection.Attempt
Expand All @@ -42,6 +42,59 @@ object SpecificAvroCodecs {
new SpecificAvroCodec[T](klass)
}

/**
* Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and
* SpecificDatumReader. Data is compressed with the provided codec.
* @param codecFactory codec with which the data is being compressed
* @tparam T compiled Avro record
* @return Injection
*/
def withCompression[T <: SpecificRecordBase: Manifest](codecFactory: CodecFactory): Injection[T, Array[Byte]] = {
val klass = manifest[T].erasure.asInstanceOf[Class[T]]
new SpecificAvroCodec[T](klass, Some(codecFactory))
}

/**
* Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and
* SpecificDatumReader. Data is compressed with the Bzip2 codec.
* @tparam T compiled Avro record
* @return Injection
*/
def withBzip2Compression[T <: SpecificRecordBase: Manifest]: Injection[T, Array[Byte]] =
withCompression(CodecFactory.bzip2Codec())

/**
* Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and
* SpecificDatumReader. Data is compressed with the Deflate codec.
* @param compressionLevel Compression level should be between 1 and 9, inclusive. Higher values result in better
* compression at the expense of encoding speed.
* @tparam T compiled Avro record
* @return Injection
*/
def withDeflateCompression[T <: SpecificRecordBase: Manifest](compressionLevel: Int): Injection[T, Array[Byte]] = {
require(1 <= compressionLevel && compressionLevel <= 9, "Compression level should be between 1 and 9, inclusive")
withCompression(CodecFactory.deflateCodec(compressionLevel))
}

/**
* Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and
* SpecificDatumReader. Data is compressed with the Deflate codec and a default compression level of 5.
* @tparam T compiled Avro record
* @return Injection
*/
// Allows to create deflate-compressing Injection's without requiring parentheses similar to `apply`,
// `withSnappyCompression`, etc. to achieve API consistency.
def withDeflateCompression[T <: SpecificRecordBase: Manifest]: Injection[T, Array[Byte]] = withDeflateCompression(5)

/**
* Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and
* SpecificDatumReader. Data is compressed with the Snappy codec.
* @tparam T compiled Avro record
* @return Injection
*/
def withSnappyCompression[T <: SpecificRecordBase: Manifest]: Injection[T, Array[Byte]] =
withCompression(CodecFactory.snappyCodec())

/**
* Returns Injection capable of serializing and deserializing a compiled avro record using org.apache.avro.io.BinaryEncoder
* @tparam T compiled Avro record
Expand Down Expand Up @@ -79,6 +132,47 @@ object GenericAvroCodecs {
new GenericAvroCodec[T](schema)
}

/**
* Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and
* SpecificDatumReader. Data is compressed with the provided codec.
* @param codecFactory codec with which the data is being compressed
* @tparam T generic record
* @return Injection
*/
def withCompression[T <: GenericRecord: Manifest](schema: Schema, codecFactory: CodecFactory): Injection[T, Array[Byte]] =
new GenericAvroCodec[T](schema, Some(codecFactory))

/**
* Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and
* SpecificDatumReader. Data is compressed with the Bzip2 codec.
* @tparam T generic record
* @return Injection
*/
def withBzip2Compression[T <: GenericRecord: Manifest](schema: Schema): Injection[T, Array[Byte]] =
withCompression(schema, CodecFactory.bzip2Codec())

/**
* Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and
* SpecificDatumReader. Data is compressed with the Deflate codec.
* @param compressionLevel Compression level should be between 1 and 9, inclusive. Higher values result in better
* compression at the expense of encoding speed. Default compression level is 5.
* @tparam T generic record
* @return Injection
*/
def withDeflateCompression[T <: GenericRecord: Manifest](schema: Schema, compressionLevel: Int = 5): Injection[T, Array[Byte]] = {
require(1 <= compressionLevel && compressionLevel <= 9, "Compression level should be between 1 and 9, inclusive")
withCompression(schema, CodecFactory.deflateCodec(compressionLevel))
}

/**
* Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and
* SpecificDatumReader. Data is compressed with the Snappy codec.
* @tparam T generic record
* @return Injection
*/
def withSnappyCompression[T <: GenericRecord: Manifest](schema: Schema): Injection[T, Array[Byte]] =
withCompression(schema, CodecFactory.snappyCodec())

/**
* Returns Injection capable of serializing and deserializing a generic avro record using org.apache.avro.io.BinaryEncoder
* @tparam T GenericRecord
Expand Down Expand Up @@ -108,10 +202,14 @@ object GenericAvroCodecs {
* @param klass class of complied record
* @tparam T compiled record
*/
class SpecificAvroCodec[T <: SpecificRecordBase](klass: Class[T]) extends Injection[T, Array[Byte]] {
class SpecificAvroCodec[T <: SpecificRecordBase](klass: Class[T], codecFactory: Option[CodecFactory] = None) extends Injection[T, Array[Byte]] {
def apply(a: T): Array[Byte] = {
val writer = new SpecificDatumWriter[T](a.getSchema)
val fileWriter = new DataFileWriter[T](writer)
codecFactory match {
case Some(cf) => fileWriter.setCodec(cf)
case None =>
}
val stream = new ByteArrayOutputStream()
fileWriter.create(a.getSchema, stream)
fileWriter.append(a)
Expand All @@ -134,10 +232,14 @@ class SpecificAvroCodec[T <: SpecificRecordBase](klass: Class[T]) extends Inject
* @param schema avro schema
* @tparam T generic record
*/
class GenericAvroCodec[T <: GenericRecord](schema: Schema) extends Injection[T, Array[Byte]] {
class GenericAvroCodec[T <: GenericRecord](schema: Schema, codecFactory: Option[CodecFactory] = None) extends Injection[T, Array[Byte]] {
def apply(a: T): Array[Byte] = {
val writer = new GenericDatumWriter[T](a.getSchema)
val fileWriter = new DataFileWriter[T](writer)
codecFactory match {
case Some(cf) => fileWriter.setCodec(cf)
case None =>
}
val stream = new ByteArrayOutputStream()
fileWriter.create(a.getSchema, stream)
fileWriter.append(a)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,46 @@ object GenericAvroCodecsSpecification extends Specification with BaseProperties
attempt.get must_== testRecord
}

"Round trip generic record using Generic Injection with Bzip2 compression" in {
implicit val genericInjection = GenericAvroCodecs.withBzip2Compression[GenericRecord](testSchema)
val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[GenericRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[GenericRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Round trip generic record using Generic Injection with Deflate compression (default compression level)" in {
implicit val genericInjection = GenericAvroCodecs.withDeflateCompression[GenericRecord](testSchema)
val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[GenericRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[GenericRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Round trip generic record using Generic Injection with Deflate compression (custom compression level)" in {
implicit val genericInjection = GenericAvroCodecs.withDeflateCompression[GenericRecord](testSchema, 9)
val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[GenericRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[GenericRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Cannot create Generic Injection with Deflate compression if compression level is set too low" in {
GenericAvroCodecs.withDeflateCompression[FiscalRecord](testSchema, 0) must throwA[IllegalArgumentException]
}

"Cannot create Generic Injection with Deflate compression if compression level is set too high" in {
GenericAvroCodecs.withDeflateCompression[FiscalRecord](testSchema, 10) must throwA[IllegalArgumentException]
}

"Round trip generic record using Generic Injection with Snappy compression" in {
implicit val genericInjection = GenericAvroCodecs.withSnappyCompression[GenericRecord](testSchema)
val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[GenericRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[GenericRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Round trip generic record using Binary Injection" in {
implicit val genericBinaryInjection = GenericAvroCodecs.toBinary[GenericRecord](testSchema)
val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,46 @@ object SpecificAvroCodecsSpecification extends Specification with BaseProperties
attempt.get must_== testRecord
}

"Round trip specific record using Specific Injection with Bzip2 compression" in {
implicit val specificInjection = SpecificAvroCodecs.withBzip2Compression[FiscalRecord]
val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[FiscalRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[FiscalRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Round trip specific record using Specific Injection with Deflate compression (default compression level)" in {
implicit val specificInjection = SpecificAvroCodecs.withDeflateCompression[FiscalRecord]
val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[FiscalRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[FiscalRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Round trip specific record using Specific Injection with Deflate compression (custom compression level)" in {
implicit val specificInjection = SpecificAvroCodecs.withDeflateCompression[FiscalRecord](9)
val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[FiscalRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[FiscalRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Cannot create Specific Injection with Deflate compression if compression level is set too low" in {
SpecificAvroCodecs.withDeflateCompression[FiscalRecord](0) must throwA[IllegalArgumentException]
}

"Cannot create Specific Injection with Deflate compression if compression level is set too high" in {
SpecificAvroCodecs.withDeflateCompression[FiscalRecord](10) must throwA[IllegalArgumentException]
}

"Round trip specific record using Specific Injection with Snappy compression" in {
implicit val specificInjection = SpecificAvroCodecs.withSnappyCompression[FiscalRecord]
val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[FiscalRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[FiscalRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Round trip specific record using Binary Injection" in {
implicit val specificBinaryInjection = SpecificAvroCodecs.toBinary[FiscalRecord]
val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12))
Expand Down

0 comments on commit f8c12ac

Please sign in to comment.