diff --git a/config.go b/config.go index a4235c3c8..a417a38b2 100644 --- a/config.go +++ b/config.go @@ -380,6 +380,10 @@ func (c *Config) Validate() error { return ConfigurationError("Producer.Retry.Backoff must be >= 0") } + if c.Producer.Compression == CompressionLZ4 && !c.Version.IsAtLeast(V0_10_0_0) { + return ConfigurationError("lz4 compression requires Version >= V0_10_0_0") + } + // validate the Consumer values switch { case c.Consumer.Fetch.Min <= 0: diff --git a/config_test.go b/config_test.go index 473c59cfa..5fef6b361 100644 --- a/config_test.go +++ b/config_test.go @@ -33,6 +33,18 @@ func TestEmptyClientIDConfigValidates(t *testing.T) { } } +func TestLZ4ConfigValidation(t *testing.T) { + config := NewConfig() + config.Producer.Compression = CompressionLZ4 + if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" { + t.Error("Expected invalid lz4/kakfa version error, got ", err) + } + config.Version = V0_10_0_0 + if err := config.Validate(); err != nil { + t.Error("Expected lz4 to work, got ", err) + } +} + // This example shows how to integrate with an existing registry as well as publishing metrics // on the standard output func ExampleConfig_metrics() { diff --git a/message.go b/message.go index 8b8e4039c..327c5fa2a 100644 --- a/message.go +++ b/message.go @@ -8,6 +8,7 @@ import ( "time" "github.com/eapache/go-xerial-snappy" + "github.com/pierrec/lz4" ) // CompressionCodec represents the various compression codecs recognized by Kafka in messages. @@ -20,6 +21,7 @@ const ( CompressionNone CompressionCodec = 0 CompressionGZIP CompressionCodec = 1 CompressionSnappy CompressionCodec = 2 + CompressionLZ4 CompressionCodec = 3 ) type Message struct { @@ -75,6 +77,18 @@ func (m *Message) encode(pe packetEncoder) error { tmp := snappy.Encode(m.Value) m.compressedCache = tmp payload = m.compressedCache + case CompressionLZ4: + var buf bytes.Buffer + writer := lz4.NewWriter(&buf) + if _, err = writer.Write(m.Value); err != nil { + return err + } + if err = writer.Close(); err != nil { + return err + } + m.compressedCache = buf.Bytes() + payload = m.compressedCache + default: return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)} } @@ -155,6 +169,18 @@ func (m *Message) decode(pd packetDecoder) (err error) { if err := m.decodeSet(); err != nil { return err } + case CompressionLZ4: + if m.Value == nil { + break + } + reader := lz4.NewReader(bytes.NewReader(m.Value)) + if m.Value, err = ioutil.ReadAll(reader); err != nil { + return err + } + if err := m.decodeSet(); err != nil { + return err + } + default: return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", m.Codec)} } diff --git a/message_test.go b/message_test.go index 1dae896fe..af70b7b18 100644 --- a/message_test.go +++ b/message_test.go @@ -1,6 +1,9 @@ package sarama -import "testing" +import ( + "testing" + "time" +) var ( emptyMessage = []byte{ @@ -21,6 +24,19 @@ var ( 0x08, 0, 0, 9, 110, 136, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0} + emptyLZ4Message = []byte{ + 132, 219, 238, 101, // CRC + 0x01, // version byte + 0x03, // attribute flags: lz4 + 0, 0, 1, 88, 141, 205, 89, 56, // timestamp + 0xFF, 0xFF, 0xFF, 0xFF, // key + 0x00, 0x00, 0x00, 0x0f, // len + 0x04, 0x22, 0x4D, 0x18, // LZ4 magic number + 100, // LZ4 flags: version 01, block indepedant, content checksum + 112, 185, 0, 0, 0, 0, // LZ4 data + 5, 93, 204, 2, // LZ4 checksum + } + emptyBulkSnappyMessage = []byte{ 180, 47, 53, 209, //CRC 0x00, // magic version byte @@ -41,6 +57,20 @@ var ( 0x1f, 0x8b, // Gzip Magic 0x08, // deflate compressed 0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, 190, 202, 112, 143, 7, 12, 12, 255, 129, 0, 33, 200, 192, 136, 41, 3, 0, 199, 226, 155, 70, 52, 0, 0, 0} + + emptyBulkLZ4Message = []byte{ + 246, 12, 188, 129, // CRC + 0x01, // Version + 0x03, // attribute flags (LZ4) + 255, 255, 249, 209, 212, 181, 73, 201, // timestamp + 0xFF, 0xFF, 0xFF, 0xFF, // key + 0x00, 0x00, 0x00, 0x47, // len + 0x04, 0x22, 0x4D, 0x18, // magic number lz4 + 100, // lz4 flags 01100100 + // version: 01, block indep: 1, block checksum: 0, content size: 0, content checksum: 1, reserved: 00 + 112, 185, 52, 0, 0, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, + 71, 129, 23, 111, // LZ4 checksum + } ) func TestMessageEncoding(t *testing.T) { @@ -50,6 +80,12 @@ func TestMessageEncoding(t *testing.T) { message.Value = []byte{} message.Codec = CompressionGZIP testEncodable(t, "empty gzip", &message, emptyGzipMessage) + + message.Value = []byte{} + message.Codec = CompressionLZ4 + message.Timestamp = time.Unix(1479847795, 0) + message.Version = 1 + testEncodable(t, "empty lz4", &message, emptyLZ4Message) } func TestMessageDecoding(t *testing.T) { @@ -111,3 +147,19 @@ func TestMessageDecodingBulkGzip(t *testing.T) { t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages)) } } + +func TestMessageDecodingBulkLZ4(t *testing.T) { + message := Message{} + testDecodable(t, "bulk lz4", &message, emptyBulkLZ4Message) + if message.Codec != CompressionLZ4 { + t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionLZ4) + } + if message.Key != nil { + t.Errorf("Decoding produced key %+v, but none was expected.", message.Key) + } + if message.Set == nil { + t.Error("Decoding produced no set, but one was expected.") + } else if len(message.Set.Messages) != 2 { + t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages)) + } +}