Skip to content

Commit

Permalink
Merge pull request #786 from rtreffer/master
Browse files Browse the repository at this point in the history
Add lz4 compression
  • Loading branch information
eapache authored Nov 22, 2016
2 parents 1cf30ac + 08ba6f6 commit e6df07f
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 1 deletion.
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,10 @@ func (c *Config) Validate() error {
return ConfigurationError("Producer.Retry.Backoff must be >= 0")
}

if c.Producer.Compression == CompressionLZ4 && !c.Version.IsAtLeast(V0_10_0_0) {
return ConfigurationError("lz4 compression requires Version >= V0_10_0_0")
}

// validate the Consumer values
switch {
case c.Consumer.Fetch.Min <= 0:
Expand Down
12 changes: 12 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ func TestEmptyClientIDConfigValidates(t *testing.T) {
}
}

func TestLZ4ConfigValidation(t *testing.T) {
config := NewConfig()
config.Producer.Compression = CompressionLZ4
if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" {
t.Error("Expected invalid lz4/kakfa version error, got ", err)
}
config.Version = V0_10_0_0
if err := config.Validate(); err != nil {
t.Error("Expected lz4 to work, got ", err)
}
}

// This example shows how to integrate with an existing registry as well as publishing metrics
// on the standard output
func ExampleConfig_metrics() {
Expand Down
26 changes: 26 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/eapache/go-xerial-snappy"
"github.com/pierrec/lz4"
)

// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
Expand All @@ -20,6 +21,7 @@ const (
CompressionNone CompressionCodec = 0
CompressionGZIP CompressionCodec = 1
CompressionSnappy CompressionCodec = 2
CompressionLZ4 CompressionCodec = 3
)

type Message struct {
Expand Down Expand Up @@ -75,6 +77,18 @@ func (m *Message) encode(pe packetEncoder) error {
tmp := snappy.Encode(m.Value)
m.compressedCache = tmp
payload = m.compressedCache
case CompressionLZ4:
var buf bytes.Buffer
writer := lz4.NewWriter(&buf)
if _, err = writer.Write(m.Value); err != nil {
return err
}
if err = writer.Close(); err != nil {
return err
}
m.compressedCache = buf.Bytes()
payload = m.compressedCache

default:
return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)}
}
Expand Down Expand Up @@ -155,6 +169,18 @@ func (m *Message) decode(pd packetDecoder) (err error) {
if err := m.decodeSet(); err != nil {
return err
}
case CompressionLZ4:
if m.Value == nil {
break
}
reader := lz4.NewReader(bytes.NewReader(m.Value))
if m.Value, err = ioutil.ReadAll(reader); err != nil {
return err
}
if err := m.decodeSet(); err != nil {
return err
}

default:
return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", m.Codec)}
}
Expand Down
54 changes: 53 additions & 1 deletion message_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package sarama

import "testing"
import (
"testing"
"time"
)

var (
emptyMessage = []byte{
Expand All @@ -21,6 +24,19 @@ var (
0x08,
0, 0, 9, 110, 136, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}

emptyLZ4Message = []byte{
132, 219, 238, 101, // CRC
0x01, // version byte
0x03, // attribute flags: lz4
0, 0, 1, 88, 141, 205, 89, 56, // timestamp
0xFF, 0xFF, 0xFF, 0xFF, // key
0x00, 0x00, 0x00, 0x0f, // len
0x04, 0x22, 0x4D, 0x18, // LZ4 magic number
100, // LZ4 flags: version 01, block indepedant, content checksum
112, 185, 0, 0, 0, 0, // LZ4 data
5, 93, 204, 2, // LZ4 checksum
}

emptyBulkSnappyMessage = []byte{
180, 47, 53, 209, //CRC
0x00, // magic version byte
Expand All @@ -41,6 +57,20 @@ var (
0x1f, 0x8b, // Gzip Magic
0x08, // deflate compressed
0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, 190, 202, 112, 143, 7, 12, 12, 255, 129, 0, 33, 200, 192, 136, 41, 3, 0, 199, 226, 155, 70, 52, 0, 0, 0}

emptyBulkLZ4Message = []byte{
246, 12, 188, 129, // CRC
0x01, // Version
0x03, // attribute flags (LZ4)
255, 255, 249, 209, 212, 181, 73, 201, // timestamp
0xFF, 0xFF, 0xFF, 0xFF, // key
0x00, 0x00, 0x00, 0x47, // len
0x04, 0x22, 0x4D, 0x18, // magic number lz4
100, // lz4 flags 01100100
// version: 01, block indep: 1, block checksum: 0, content size: 0, content checksum: 1, reserved: 00
112, 185, 52, 0, 0, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0,
71, 129, 23, 111, // LZ4 checksum
}
)

func TestMessageEncoding(t *testing.T) {
Expand All @@ -50,6 +80,12 @@ func TestMessageEncoding(t *testing.T) {
message.Value = []byte{}
message.Codec = CompressionGZIP
testEncodable(t, "empty gzip", &message, emptyGzipMessage)

message.Value = []byte{}
message.Codec = CompressionLZ4
message.Timestamp = time.Unix(1479847795, 0)
message.Version = 1
testEncodable(t, "empty lz4", &message, emptyLZ4Message)
}

func TestMessageDecoding(t *testing.T) {
Expand Down Expand Up @@ -111,3 +147,19 @@ func TestMessageDecodingBulkGzip(t *testing.T) {
t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
}
}

func TestMessageDecodingBulkLZ4(t *testing.T) {
message := Message{}
testDecodable(t, "bulk lz4", &message, emptyBulkLZ4Message)
if message.Codec != CompressionLZ4 {
t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionLZ4)
}
if message.Key != nil {
t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
}
if message.Set == nil {
t.Error("Decoding produced no set, but one was expected.")
} else if len(message.Set.Messages) != 2 {
t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
}
}

0 comments on commit e6df07f

Please sign in to comment.