Skip to content

Commit

Permalink
make decoding unknown message versions error
Browse files Browse the repository at this point in the history
If sarama encounters an unknown message version somehow (e.g. due to a
bug in Kafka 0.11's downconversion), right now sarama will carry on
decoding the message, resulting in much harder to understand errors.

Instead, bail out early with a friendly error message.
  • Loading branch information
tcrayford committed Aug 25, 2017
1 parent 55b3f6a commit 0eb6982
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
6 changes: 5 additions & 1 deletion message.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,17 @@ func (m *Message) decode(pd packetDecoder) (err error) {
return err
}

if m.Version > 1 {
return PacketDecodingError{fmt.Sprintf("unknown magic byte (%v)", m.Version)}
}

attribute, err := pd.getInt8()
if err != nil {
return err
}
m.Codec = CompressionCodec(attribute & compressionCodecMask)

if m.Version >= 1 {
if m.Version == 1 {
millis, err := pd.getInt64()
if err != nil {
return err
Expand Down
31 changes: 31 additions & 0 deletions message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,21 @@ var (
0xFF, 0xFF, 0xFF, 0xFF, // key
0xFF, 0xFF, 0xFF, 0xFF} // value

emptyV1Message = []byte{
167, 236, 104, 3, // CRC
0x01, // magic version byte
0x00, // attribute flags
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // timestamp
0xFF, 0xFF, 0xFF, 0xFF, // key
0xFF, 0xFF, 0xFF, 0xFF} // value

emptyV2Message = []byte{
167, 236, 104, 3, // CRC
0x02, // magic version byte
0x00, // attribute flags
0xFF, 0xFF, 0xFF, 0xFF, // key
0xFF, 0xFF, 0xFF, 0xFF} // value

emptyGzipMessage = []byte{
97, 79, 149, 90, //CRC
0x00, // magic version byte
Expand Down Expand Up @@ -179,3 +194,19 @@ func TestMessageDecodingBulkLZ4(t *testing.T) {
t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
}
}

func TestMessageDecodingVersion1(t *testing.T) {
message := Message{Version: 1}
testDecodable(t, "decoding empty v1 message", &message, emptyV1Message)
}

func TestMessageDecodingUnknownVersions(t *testing.T) {
message := Message{Version: 2}
err := decode(emptyV2Message, &message)
if err == nil {
t.Error("Decoding did not produce an error for an unknown magic byte")
}
if err.Error() != "kafka: error decoding packet: unknown magic byte (2)" {
t.Error("Decoding an unknown magic byte produced an unknown error ", err)
}
}

0 comments on commit 0eb6982

Please sign in to comment.