diff --git a/broker_test.go b/broker_test.go index 1525bcac6..9263cef8b 100644 --- a/broker_test.go +++ b/broker_test.go @@ -71,7 +71,7 @@ func TestSimpleBrokerCommunication(t *testing.T) { // Set the broker id in order to validate local broker metrics broker.id = 0 conf := NewConfig() - conf.Version = V0_10_0_0 + conf.Version = tt.version err := broker.Open(conf) if err != nil { t.Fatal(err) @@ -97,11 +97,13 @@ func TestSimpleBrokerCommunication(t *testing.T) { // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake var brokerTestTable = []struct { + version KafkaVersion name string response []byte runner func(*testing.T, *Broker) }{ - {"MetadataRequest", + {V0_10_0_0, + "MetadataRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := MetadataRequest{} @@ -114,7 +116,8 @@ var brokerTestTable = []struct { } }}, - {"ConsumerMetadataRequest", + {V0_10_0_0, + "ConsumerMetadataRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ConsumerMetadataRequest{} @@ -127,7 +130,8 @@ var brokerTestTable = []struct { } }}, - {"ProduceRequest (NoResponse)", + {V0_10_0_0, + "ProduceRequest (NoResponse)", []byte{}, func(t *testing.T, broker *Broker) { request := ProduceRequest{} @@ -141,7 +145,8 @@ var brokerTestTable = []struct { } }}, - {"ProduceRequest (WaitForLocal)", + {V0_10_0_0, + "ProduceRequest (WaitForLocal)", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ProduceRequest{} @@ -155,7 +160,8 @@ var brokerTestTable = []struct { } }}, - {"FetchRequest", + {V0_10_0_0, + "FetchRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := FetchRequest{} @@ -168,7 +174,8 @@ var brokerTestTable = []struct { } }}, - {"OffsetFetchRequest", + {V0_10_0_0, + "OffsetFetchRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := OffsetFetchRequest{} @@ -181,7 +188,8 @@ var brokerTestTable = []struct { } }}, - {"OffsetCommitRequest", + {V0_10_0_0, + "OffsetCommitRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := OffsetCommitRequest{} @@ -194,7 +202,8 @@ var brokerTestTable = []struct { } }}, - {"OffsetRequest", + {V0_10_0_0, + "OffsetRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := OffsetRequest{} @@ -207,7 +216,8 @@ var brokerTestTable = []struct { } }}, - {"JoinGroupRequest", + {V0_10_0_0, + "JoinGroupRequest", []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := JoinGroupRequest{} @@ -220,7 +230,8 @@ var brokerTestTable = []struct { } }}, - {"SyncGroupRequest", + {V0_10_0_0, + "SyncGroupRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := SyncGroupRequest{} @@ -233,7 +244,8 @@ var brokerTestTable = []struct { } }}, - {"LeaveGroupRequest", + {V0_10_0_0, + "LeaveGroupRequest", []byte{0x00, 0x00}, func(t *testing.T, broker *Broker) { request := LeaveGroupRequest{} @@ -246,7 +258,8 @@ var brokerTestTable = []struct { } }}, - {"HeartbeatRequest", + {V0_10_0_0, + "HeartbeatRequest", []byte{0x00, 0x00}, func(t *testing.T, broker *Broker) { request := HeartbeatRequest{} @@ -259,7 +272,8 @@ var brokerTestTable = []struct { } }}, - {"ListGroupsRequest", + {V0_10_0_0, + "ListGroupsRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ListGroupsRequest{} @@ -272,7 +286,8 @@ var brokerTestTable = []struct { } }}, - {"DescribeGroupsRequest", + {V0_10_0_0, + "DescribeGroupsRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := DescribeGroupsRequest{} @@ -285,7 +300,8 @@ var brokerTestTable = []struct { } }}, - {"ApiVersionsRequest", + {V0_10_0_0, + "ApiVersionsRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ApiVersionsRequest{} @@ -298,7 +314,8 @@ var brokerTestTable = []struct { } }}, - {"DeleteGroupsRequest", + {V1_1_0_0, + "DeleteGroupsRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := DeleteGroupsRequest{}