Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compression support to bijection-avro #174

Merged
merged 1 commit into from
Aug 14, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package com.twitter.bijection.avro

import com.twitter.bijection.Injection
import org.apache.avro.specific.{ SpecificDatumReader, SpecificDatumWriter, SpecificRecordBase }
import org.apache.avro.file.{ DataFileStream, DataFileWriter }
import org.apache.avro.file.{ CodecFactory, DataFileStream, DataFileWriter }
import java.io.{ ByteArrayInputStream, ByteArrayOutputStream }
import com.twitter.bijection.Inversion.attempt
import com.twitter.bijection.Attempt
Expand All @@ -42,6 +42,59 @@ object SpecificAvroCodecs {
new SpecificAvroCodec[T](klass)
}

/**
* Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and
* SpecificDatumReader. Data is compressed with the provided codec.
* @param codecFactory codec with which the data is being compressed
* @tparam T compiled Avro record
* @return Injection
*/
def withCompression[T <: SpecificRecordBase: Manifest](codecFactory: CodecFactory): Injection[T, Array[Byte]] = {
val klass = manifest[T].erasure.asInstanceOf[Class[T]]
new SpecificAvroCodec[T](klass, Some(codecFactory))
}

/**
* Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and
* SpecificDatumReader. Data is compressed with the Bzip2 codec.
* @tparam T compiled Avro record
* @return Injection
*/
def withBzip2Compression[T <: SpecificRecordBase: Manifest]: Injection[T, Array[Byte]] =
withCompression(CodecFactory.bzip2Codec())

/**
* Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and
* SpecificDatumReader. Data is compressed with the Deflate codec.
* @param compressionLevel Compression level should be between 1 and 9, inclusive. Higher values result in better
* compression at the expense of encoding speed.
* @tparam T compiled Avro record
* @return Injection
*/
def withDeflateCompression[T <: SpecificRecordBase: Manifest](compressionLevel: Int): Injection[T, Array[Byte]] = {
require(1 <= compressionLevel && compressionLevel <= 9, "Compression level should be between 1 and 9, inclusive")
withCompression(CodecFactory.deflateCodec(compressionLevel))
}

/**
* Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and
* SpecificDatumReader. Data is compressed with the Deflate codec and a default compression level of 5.
* @tparam T compiled Avro record
* @return Injection
*/
// Allows to create deflate-compressing Injection's without requiring parentheses similar to `apply`,
// `withSnappyCompression`, etc. to achieve API consistency.
def withDeflateCompression[T <: SpecificRecordBase: Manifest]: Injection[T, Array[Byte]] = withDeflateCompression(5)

/**
* Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and
* SpecificDatumReader. Data is compressed with the Snappy codec.
* @tparam T compiled Avro record
* @return Injection
*/
def withSnappyCompression[T <: SpecificRecordBase: Manifest]: Injection[T, Array[Byte]] =
withCompression(CodecFactory.snappyCodec())

/**
* Returns Injection capable of serializing and deserializing a compiled avro record using org.apache.avro.io.BinaryEncoder
* @tparam T compiled Avro record
Expand Down Expand Up @@ -79,6 +132,47 @@ object GenericAvroCodecs {
new GenericAvroCodec[T](schema)
}

/**
* Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and
* SpecificDatumReader. Data is compressed with the provided codec.
* @param codecFactory codec with which the data is being compressed
* @tparam T generic record
* @return Injection
*/
def withCompression[T <: GenericRecord: Manifest](schema: Schema, codecFactory: CodecFactory): Injection[T, Array[Byte]] =
new GenericAvroCodec[T](schema, Some(codecFactory))

/**
* Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and
* SpecificDatumReader. Data is compressed with the Bzip2 codec.
* @tparam T generic record
* @return Injection
*/
def withBzip2Compression[T <: GenericRecord: Manifest](schema: Schema): Injection[T, Array[Byte]] =
withCompression(schema, CodecFactory.bzip2Codec())

/**
* Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and
* SpecificDatumReader. Data is compressed with the Deflate codec.
* @param compressionLevel Compression level should be between 1 and 9, inclusive. Higher values result in better
* compression at the expense of encoding speed. Default compression level is 5.
* @tparam T generic record
* @return Injection
*/
def withDeflateCompression[T <: GenericRecord: Manifest](schema: Schema, compressionLevel: Int = 5): Injection[T, Array[Byte]] = {
require(1 <= compressionLevel && compressionLevel <= 9, "Compression level should be between 1 and 9, inclusive")
withCompression(schema, CodecFactory.deflateCodec(compressionLevel))
}

/**
* Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and
* SpecificDatumReader. Data is compressed with the Snappy codec.
* @tparam T generic record
* @return Injection
*/
def withSnappyCompression[T <: GenericRecord: Manifest](schema: Schema): Injection[T, Array[Byte]] =
withCompression(schema, CodecFactory.snappyCodec())

/**
* Returns Injection capable of serializing and deserializing a generic avro record using org.apache.avro.io.BinaryEncoder
* @tparam T GenericRecord
Expand Down Expand Up @@ -108,10 +202,14 @@ object GenericAvroCodecs {
* @param klass class of complied record
* @tparam T compiled record
*/
class SpecificAvroCodec[T <: SpecificRecordBase](klass: Class[T]) extends Injection[T, Array[Byte]] {
class SpecificAvroCodec[T <: SpecificRecordBase](klass: Class[T], codecFactory: Option[CodecFactory] = None) extends Injection[T, Array[Byte]] {
def apply(a: T): Array[Byte] = {
val writer = new SpecificDatumWriter[T](a.getSchema)
val fileWriter = new DataFileWriter[T](writer)
codecFactory match {
case Some(cf) => fileWriter.setCodec(cf)
case None =>
}
val stream = new ByteArrayOutputStream()
fileWriter.create(a.getSchema, stream)
fileWriter.append(a)
Expand All @@ -134,10 +232,14 @@ class SpecificAvroCodec[T <: SpecificRecordBase](klass: Class[T]) extends Inject
* @param schema avro schema
* @tparam T generic record
*/
class GenericAvroCodec[T <: GenericRecord](schema: Schema) extends Injection[T, Array[Byte]] {
class GenericAvroCodec[T <: GenericRecord](schema: Schema, codecFactory: Option[CodecFactory] = None) extends Injection[T, Array[Byte]] {
def apply(a: T): Array[Byte] = {
val writer = new GenericDatumWriter[T](a.getSchema)
val fileWriter = new DataFileWriter[T](writer)
codecFactory match {
case Some(cf) => fileWriter.setCodec(cf)
case None =>
}
val stream = new ByteArrayOutputStream()
fileWriter.create(a.getSchema, stream)
fileWriter.append(a)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,46 @@ object GenericAvroCodecsSpecification extends Specification with BaseProperties
attempt.get must_== testRecord
}

"Round trip generic record using Generic Injection with Bzip2 compression" in {
implicit val genericInjection = GenericAvroCodecs.withBzip2Compression[GenericRecord](testSchema)
val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[GenericRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[GenericRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Round trip generic record using Generic Injection with Deflate compression (default compression level)" in {
implicit val genericInjection = GenericAvroCodecs.withDeflateCompression[GenericRecord](testSchema)
val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[GenericRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[GenericRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Round trip generic record using Generic Injection with Deflate compression (custom compression level)" in {
implicit val genericInjection = GenericAvroCodecs.withDeflateCompression[GenericRecord](testSchema, 9)
val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[GenericRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[GenericRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Cannot create Generic Injection with Deflate compression if compression level is set too low" in {
GenericAvroCodecs.withDeflateCompression[FiscalRecord](testSchema, 0) must throwA[IllegalArgumentException]
}

"Cannot create Generic Injection with Deflate compression if compression level is set too high" in {
GenericAvroCodecs.withDeflateCompression[FiscalRecord](testSchema, 10) must throwA[IllegalArgumentException]
}

"Round trip generic record using Generic Injection with Snappy compression" in {
implicit val genericInjection = GenericAvroCodecs.withSnappyCompression[GenericRecord](testSchema)
val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[GenericRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[GenericRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Round trip generic record using Binary Injection" in {
implicit val genericBinaryInjection = GenericAvroCodecs.toBinary[GenericRecord](testSchema)
val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,46 @@ object SpecificAvroCodecsSpecification extends Specification with BaseProperties
attempt.get must_== testRecord
}

"Round trip specific record using Specific Injection with Bzip2 compression" in {
implicit val specificInjection = SpecificAvroCodecs.withBzip2Compression[FiscalRecord]
val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[FiscalRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[FiscalRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Round trip specific record using Specific Injection with Deflate compression (default compression level)" in {
implicit val specificInjection = SpecificAvroCodecs.withDeflateCompression[FiscalRecord]
val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[FiscalRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[FiscalRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Round trip specific record using Specific Injection with Deflate compression (custom compression level)" in {
implicit val specificInjection = SpecificAvroCodecs.withDeflateCompression[FiscalRecord](9)
val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[FiscalRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[FiscalRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Cannot create Specific Injection with Deflate compression if compression level is set too low" in {
SpecificAvroCodecs.withDeflateCompression[FiscalRecord](0) must throwA[IllegalArgumentException]
}

"Cannot create Specific Injection with Deflate compression if compression level is set too high" in {
SpecificAvroCodecs.withDeflateCompression[FiscalRecord](10) must throwA[IllegalArgumentException]
}

"Round trip specific record using Specific Injection with Snappy compression" in {
implicit val specificInjection = SpecificAvroCodecs.withSnappyCompression[FiscalRecord]
val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[FiscalRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[FiscalRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Round trip specific record using Binary Injection" in {
implicit val specificBinaryInjection = SpecificAvroCodecs.toBinary[FiscalRecord]
val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12))
Expand Down