Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make decoding unknown message versions error #940

Merged
merged 2 commits into from
Aug 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions crc32_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"encoding/binary"
"fmt"
"hash/crc32"
)

Expand All @@ -27,8 +28,9 @@ func (c *crc32Field) run(curOffset int, buf []byte) error {
func (c *crc32Field) check(curOffset int, buf []byte) error {
crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])

if crc != binary.BigEndian.Uint32(buf[c.startOffset:]) {
return PacketDecodingError{"CRC didn't match"}
expected := binary.BigEndian.Uint32(buf[c.startOffset:])
if crc != expected {
return PacketDecodingError{fmt.Sprintf("CRC didn't match expected %#x got %#x", expected, crc)}
}

return nil
Expand Down
6 changes: 5 additions & 1 deletion message.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,17 @@ func (m *Message) decode(pd packetDecoder) (err error) {
return err
}

if m.Version > 1 {
return PacketDecodingError{fmt.Sprintf("unknown magic byte (%v)", m.Version)}
}

attribute, err := pd.getInt8()
if err != nil {
return err
}
m.Codec = CompressionCodec(attribute & compressionCodecMask)

if m.Version >= 1 {
if m.Version == 1 {
millis, err := pd.getInt64()
if err != nil {
return err
Expand Down
31 changes: 31 additions & 0 deletions message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,21 @@ var (
0xFF, 0xFF, 0xFF, 0xFF, // key
0xFF, 0xFF, 0xFF, 0xFF} // value

emptyV1Message = []byte{
204, 47, 121, 217, // CRC
0x01, // magic version byte
0x00, // attribute flags
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // timestamp
0xFF, 0xFF, 0xFF, 0xFF, // key
0xFF, 0xFF, 0xFF, 0xFF} // value

emptyV2Message = []byte{
167, 236, 104, 3, // CRC
0x02, // magic version byte
0x00, // attribute flags
0xFF, 0xFF, 0xFF, 0xFF, // key
0xFF, 0xFF, 0xFF, 0xFF} // value

emptyGzipMessage = []byte{
97, 79, 149, 90, //CRC
0x00, // magic version byte
Expand Down Expand Up @@ -179,3 +194,19 @@ func TestMessageDecodingBulkLZ4(t *testing.T) {
t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
}
}

func TestMessageDecodingVersion1(t *testing.T) {
message := Message{Version: 1}
testDecodable(t, "decoding empty v1 message", &message, emptyV1Message)
}

func TestMessageDecodingUnknownVersions(t *testing.T) {
message := Message{Version: 2}
err := decode(emptyV2Message, &message)
if err == nil {
t.Error("Decoding did not produce an error for an unknown magic byte")
}
if err.Error() != "kafka: error decoding packet: unknown magic byte (2)" {
t.Error("Decoding an unknown magic byte produced an unknown error ", err)
}
}