From f7ed0d9d3e423d84ef1ffd2446b27caa709adb9c Mon Sep 17 00:00:00 2001 From: Rene Treffer Date: Wed, 5 Oct 2016 16:05:33 +0200 Subject: [PATCH 1/4] Add test for broken message version if compression is enabled. --- produce_set_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/produce_set_test.go b/produce_set_test.go index da62da914..d016a10b7 100644 --- a/produce_set_test.go +++ b/produce_set_test.go @@ -141,3 +141,45 @@ func TestProduceSetRequestBuilding(t *testing.T) { t.Error("Wrong number of topics in request") } } + +func TestProduceSetCompressedRequestBuilding(t *testing.T) { + parent, ps := makeProduceSet() + parent.conf.Producer.RequiredAcks = WaitForAll + parent.conf.Producer.Timeout = 10 * time.Second + parent.conf.Producer.Compression = CompressionGZIP + parent.conf.Version = V0_10_0_0 + + msg := &ProducerMessage{ + Topic: "t1", + Partition: 0, + Key: StringEncoder(TestMessage), + Value: StringEncoder(TestMessage), + Timestamp: time.Now(), + } + for i := 0; i < 10; i++ { + safeAddMessage(t, ps, msg) + } + + req := ps.buildRequest() + + if req.Version != 2 { + t.Error("Wrong request version") + } + + for _, msgBlock := range req.msgSets["t1"][0].Messages { + msg := msgBlock.Msg + err := msg.decodeSet() + if err != nil { + t.Error("Failed to decode set from payload") + } + for _, compMsgBlock := range msg.Set.Messages { + compMsg := compMsgBlock.Msg + if compMsg.Version != 1 { + t.Error("Wrong compressed message version") + } + } + if msg.Version != 1 { + t.Error("Wrong compressed parent message version") + } + } +} From b1f4708fb4482310066d57097b49bfea465d9a77 Mon Sep 17 00:00:00 2001 From: Rene Treffer Date: Wed, 5 Oct 2016 16:15:58 +0200 Subject: [PATCH 2/4] Fix message version/timestamp for compressed messages on kafka >= 0.10.0.0 --- produce_set.go | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/produce_set.go b/produce_set.go index 992f1f141..60d1e9884 100644 --- a/produce_set.go +++ b/produce_set.go @@ -90,11 +90,29 @@ func (ps *produceSet) buildRequest() *ProduceRequest { Logger.Println(err) // if this happens, it's basically our fault. panic(err) } - req.AddMessage(topic, partition, &Message{ + compMsg := &Message{ Codec: ps.parent.conf.Producer.Compression, Key: nil, Value: payload, - }) + } + if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { + // Compressed messages must use a protocol version + // that is newer than the inner messages version. + // Due to a lack of better timestamp notation copy the oldest + // (earliest) timestamp to message. + for _, msgBlock := range set.setToSend.Messages { + msg := msgBlock.Msg + if msg.Version > compMsg.Version { + compMsg.Version = msg.Version + } + if !msg.Timestamp.IsZero() && + (compMsg.Timestamp.IsZero() || + compMsg.Timestamp.After(msg.Timestamp)) { + compMsg.Timestamp = msg.Timestamp + } + } + } + req.AddMessage(topic, partition, compMsg) } } } From 425aaad160e2954e12dde6c502ebf29c631e7d2e Mon Sep 17 00:00:00 2001 From: Rene Treffer Date: Thu, 6 Oct 2016 11:00:50 +0200 Subject: [PATCH 3/4] Use default timestamp of time.Now and message version 1 if possible --- produce_set.go | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/produce_set.go b/produce_set.go index 60d1e9884..ec3bc7c38 100644 --- a/produce_set.go +++ b/produce_set.go @@ -53,8 +53,12 @@ func (ps *produceSet) add(msg *ProducerMessage) error { set.msgs = append(set.msgs, msg) msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val} - if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) && !msg.Timestamp.IsZero() { - msgToSend.Timestamp = msg.Timestamp + if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { + if msg.Timestamp.IsZero() { + msgToSend.Timestamp = time.Now() + } else { + msgToSend.Timestamp = msg.Timestamp + } msgToSend.Version = 1 } set.setToSend.addMessage(msgToSend) @@ -96,21 +100,8 @@ func (ps *produceSet) buildRequest() *ProduceRequest { Value: payload, } if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { - // Compressed messages must use a protocol version - // that is newer than the inner messages version. - // Due to a lack of better timestamp notation copy the oldest - // (earliest) timestamp to message. - for _, msgBlock := range set.setToSend.Messages { - msg := msgBlock.Msg - if msg.Version > compMsg.Version { - compMsg.Version = msg.Version - } - if !msg.Timestamp.IsZero() && - (compMsg.Timestamp.IsZero() || - compMsg.Timestamp.After(msg.Timestamp)) { - compMsg.Timestamp = msg.Timestamp - } - } + compMsg.Version = 1 + compMsg.Timestamp = time.Now() } req.AddMessage(topic, partition, compMsg) } From 413f13f95f3b5d66f9c3487696f680aefc76a569 Mon Sep 17 00:00:00 2001 From: Rene Treffer Date: Thu, 6 Oct 2016 17:16:58 +0200 Subject: [PATCH 4/4] Approximate timestamp of compressed message The first messages timestamp of a set is a proxy for the actual timestamp of the group in many cases. --- produce_set.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/produce_set.go b/produce_set.go index ec3bc7c38..74025b0e9 100644 --- a/produce_set.go +++ b/produce_set.go @@ -101,7 +101,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest { } if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { compMsg.Version = 1 - compMsg.Timestamp = time.Now() + compMsg.Timestamp = set.setToSend.Messages[0].Msg.Timestamp } req.AddMessage(topic, partition, compMsg) }