From fea677b253efd358e1b3097c027e323020d1059d Mon Sep 17 00:00:00 2001 From: Rene Treffer Date: Tue, 22 Nov 2016 11:54:14 +0100 Subject: [PATCH 1/5] Add lz4 compression --- message.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/message.go b/message.go index 0f0ca5b6d..4dc54f10d 100644 --- a/message.go +++ b/message.go @@ -8,6 +8,7 @@ import ( "time" "github.com/eapache/go-xerial-snappy" + "github.com/pierrec/lz4" ) // CompressionCodec represents the various compression codecs recognized by Kafka in messages. @@ -20,6 +21,7 @@ const ( CompressionNone CompressionCodec = 0 CompressionGZIP CompressionCodec = 1 CompressionSnappy CompressionCodec = 2 + CompressionLZ4 CompressionCodec = 3 ) type Message struct { @@ -74,6 +76,18 @@ func (m *Message) encode(pe packetEncoder) error { tmp := snappy.Encode(m.Value) m.compressedCache = tmp payload = m.compressedCache + case CompressionLZ4: + var buf bytes.Buffer + writer := lz4.NewWriter(&buf) + if _, err = writer.Write(m.Value); err != nil { + return err + } + if err = writer.Close(); err != nil { + return err + } + m.compressedCache = buf.Bytes() + payload = m.compressedCache + default: return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)} } @@ -148,6 +162,18 @@ func (m *Message) decode(pd packetDecoder) (err error) { if err := m.decodeSet(); err != nil { return err } + case CompressionLZ4: + if m.Value == nil { + break + } + reader := lz4.NewReader(bytes.NewReader(m.Value)) + if m.Value, err = ioutil.ReadAll(reader); err != nil { + return err + } + if err := m.decodeSet(); err != nil { + return err + } + default: return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", m.Codec)} } From 645b0298c14e71d5f34497fa03f8c3a0a52a5c34 Mon Sep 17 00:00:00 2001 From: Rene Treffer Date: Tue, 22 Nov 2016 17:42:31 +0100 Subject: [PATCH 2/5] Guard against using LZ4 with old kafka protocol versions --- config.go | 6 ++++++ config_test.go | 12 ++++++++++++ 2 files changed, 18 insertions(+) diff --git a/config.go b/config.go index 9cab83491..ed1a6042c 100644 --- a/config.go +++ b/config.go @@ -375,6 +375,12 @@ func (c *Config) Validate() error { return ConfigurationError("Producer.Retry.Backoff must be >= 0") } + if c.Producer.Compression == CompressionLZ4 && (c.Version == V0_8_2_0 || c.Version == V0_8_2_1 || + c.Version == V0_8_2_2 || c.Version == V0_9_0_0 || + c.Version == V0_9_0_1) { + return ConfigurationError("lz4 compression requires Version >= V0_10_0_0") + } + // validate the Consumer values switch { case c.Consumer.Fetch.Min <= 0: diff --git a/config_test.go b/config_test.go index 473c59cfa..5fef6b361 100644 --- a/config_test.go +++ b/config_test.go @@ -33,6 +33,18 @@ func TestEmptyClientIDConfigValidates(t *testing.T) { } } +func TestLZ4ConfigValidation(t *testing.T) { + config := NewConfig() + config.Producer.Compression = CompressionLZ4 + if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" { + t.Error("Expected invalid lz4/kakfa version error, got ", err) + } + config.Version = V0_10_0_0 + if err := config.Validate(); err != nil { + t.Error("Expected lz4 to work, got ", err) + } +} + // This example shows how to integrate with an existing registry as well as publishing metrics // on the standard output func ExampleConfig_metrics() { From dc5a7b89a15a6b765549ce648d9a9ddcf21985fb Mon Sep 17 00:00:00 2001 From: Rene Treffer Date: Tue, 22 Nov 2016 21:37:25 +0100 Subject: [PATCH 3/5] Simplify version check for lz4 and kafka 0.10.0.0. --- config.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/config.go b/config.go index ed1a6042c..2d7e8eb63 100644 --- a/config.go +++ b/config.go @@ -375,9 +375,7 @@ func (c *Config) Validate() error { return ConfigurationError("Producer.Retry.Backoff must be >= 0") } - if c.Producer.Compression == CompressionLZ4 && (c.Version == V0_8_2_0 || c.Version == V0_8_2_1 || - c.Version == V0_8_2_2 || c.Version == V0_9_0_0 || - c.Version == V0_9_0_1) { + if c.Producer.Compression == CompressionLZ4 && !c.Version.IsAtLeast(V0_10_0_0) { return ConfigurationError("lz4 compression requires Version >= V0_10_0_0") } From 3b10566dfd8b3b29f65737d5e41352e88c2abb33 Mon Sep 17 00:00:00 2001 From: Rene Treffer Date: Tue, 22 Nov 2016 21:38:01 +0100 Subject: [PATCH 4/5] Add lz4 decode test --- message_test.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/message_test.go b/message_test.go index 1dae896fe..90eb70be6 100644 --- a/message_test.go +++ b/message_test.go @@ -41,6 +41,20 @@ var ( 0x1f, 0x8b, // Gzip Magic 0x08, // deflate compressed 0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, 190, 202, 112, 143, 7, 12, 12, 255, 129, 0, 33, 200, 192, 136, 41, 3, 0, 199, 226, 155, 70, 52, 0, 0, 0} + + emptyBulkLZ4Message = []byte{ + 246, 12, 188, 129, // CRC + 0x01, // Version + 0x03, // attribute flags (LZ4) + 255, 255, 249, 209, 212, 181, 73, 201, // timestamp + 0xFF, 0xFF, 0xFF, 0xFF, // key + 0x00, 0x00, 0x00, 0x47, // len + 0x04, 0x22, 0x4D, 0x18, // magic number lz4 + 100, // lz4 flags 01100100 + // version: 01, block indep: 1, block checksum: 0, content size: 0, content checksum: 1, reserved: 00 + 112, 185, 52, 0, 0, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, + 71, 129, 23, 111, // LZ4 checksum + } ) func TestMessageEncoding(t *testing.T) { @@ -111,3 +125,19 @@ func TestMessageDecodingBulkGzip(t *testing.T) { t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages)) } } + +func TestMessageDecodingBulkLZ4(t *testing.T) { + message := Message{} + testDecodable(t, "bulk lz4", &message, emptyBulkLZ4Message) + if message.Codec != CompressionLZ4 { + t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionLZ4) + } + if message.Key != nil { + t.Errorf("Decoding produced key %+v, but none was expected.", message.Key) + } + if message.Set == nil { + t.Error("Decoding produced no set, but one was expected.") + } else if len(message.Set.Messages) != 2 { + t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages)) + } +} From 08ba6f6d5cdf9610c7630f5dc9c050ec1453ad49 Mon Sep 17 00:00:00 2001 From: Rene Treffer Date: Tue, 22 Nov 2016 21:59:38 +0100 Subject: [PATCH 5/5] Add lz4 encoding test --- message_test.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/message_test.go b/message_test.go index 90eb70be6..af70b7b18 100644 --- a/message_test.go +++ b/message_test.go @@ -1,6 +1,9 @@ package sarama -import "testing" +import ( + "testing" + "time" +) var ( emptyMessage = []byte{ @@ -21,6 +24,19 @@ var ( 0x08, 0, 0, 9, 110, 136, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0} + emptyLZ4Message = []byte{ + 132, 219, 238, 101, // CRC + 0x01, // version byte + 0x03, // attribute flags: lz4 + 0, 0, 1, 88, 141, 205, 89, 56, // timestamp + 0xFF, 0xFF, 0xFF, 0xFF, // key + 0x00, 0x00, 0x00, 0x0f, // len + 0x04, 0x22, 0x4D, 0x18, // LZ4 magic number + 100, // LZ4 flags: version 01, block indepedant, content checksum + 112, 185, 0, 0, 0, 0, // LZ4 data + 5, 93, 204, 2, // LZ4 checksum + } + emptyBulkSnappyMessage = []byte{ 180, 47, 53, 209, //CRC 0x00, // magic version byte @@ -64,6 +80,12 @@ func TestMessageEncoding(t *testing.T) { message.Value = []byte{} message.Codec = CompressionGZIP testEncodable(t, "empty gzip", &message, emptyGzipMessage) + + message.Value = []byte{} + message.Codec = CompressionLZ4 + message.Timestamp = time.Unix(1479847795, 0) + message.Version = 1 + testEncodable(t, "empty lz4", &message, emptyLZ4Message) } func TestMessageDecoding(t *testing.T) {