Skip to content

Commit

Permalink
Make kafka-version configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
RussellLuo committed May 4, 2018
1 parent 7d531e2 commit 61cb20c
Showing 1 changed file with 34 additions and 17 deletions.
51 changes: 34 additions & 17 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
// Set the broker id in order to validate local broker metrics
broker.id = 0
conf := NewConfig()
conf.Version = V0_10_0_0
conf.Version = tt.version
err := broker.Open(conf)
if err != nil {
t.Fatal(err)
Expand All @@ -97,11 +97,13 @@ func TestSimpleBrokerCommunication(t *testing.T) {

// We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
var brokerTestTable = []struct {
version KafkaVersion
name string
response []byte
runner func(*testing.T, *Broker)
}{
{"MetadataRequest",
{V0_10_0_0,
"MetadataRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := MetadataRequest{}
Expand All @@ -114,7 +116,8 @@ var brokerTestTable = []struct {
}
}},

{"ConsumerMetadataRequest",
{V0_10_0_0,
"ConsumerMetadataRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := ConsumerMetadataRequest{}
Expand All @@ -127,7 +130,8 @@ var brokerTestTable = []struct {
}
}},

{"ProduceRequest (NoResponse)",
{V0_10_0_0,
"ProduceRequest (NoResponse)",
[]byte{},
func(t *testing.T, broker *Broker) {
request := ProduceRequest{}
Expand All @@ -141,7 +145,8 @@ var brokerTestTable = []struct {
}
}},

{"ProduceRequest (WaitForLocal)",
{V0_10_0_0,
"ProduceRequest (WaitForLocal)",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := ProduceRequest{}
Expand All @@ -155,7 +160,8 @@ var brokerTestTable = []struct {
}
}},

{"FetchRequest",
{V0_10_0_0,
"FetchRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := FetchRequest{}
Expand All @@ -168,7 +174,8 @@ var brokerTestTable = []struct {
}
}},

{"OffsetFetchRequest",
{V0_10_0_0,
"OffsetFetchRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := OffsetFetchRequest{}
Expand All @@ -181,7 +188,8 @@ var brokerTestTable = []struct {
}
}},

{"OffsetCommitRequest",
{V0_10_0_0,
"OffsetCommitRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := OffsetCommitRequest{}
Expand All @@ -194,7 +202,8 @@ var brokerTestTable = []struct {
}
}},

{"OffsetRequest",
{V0_10_0_0,
"OffsetRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := OffsetRequest{}
Expand All @@ -207,7 +216,8 @@ var brokerTestTable = []struct {
}
}},

{"JoinGroupRequest",
{V0_10_0_0,
"JoinGroupRequest",
[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := JoinGroupRequest{}
Expand All @@ -220,7 +230,8 @@ var brokerTestTable = []struct {
}
}},

{"SyncGroupRequest",
{V0_10_0_0,
"SyncGroupRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := SyncGroupRequest{}
Expand All @@ -233,7 +244,8 @@ var brokerTestTable = []struct {
}
}},

{"LeaveGroupRequest",
{V0_10_0_0,
"LeaveGroupRequest",
[]byte{0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := LeaveGroupRequest{}
Expand All @@ -246,7 +258,8 @@ var brokerTestTable = []struct {
}
}},

{"HeartbeatRequest",
{V0_10_0_0,
"HeartbeatRequest",
[]byte{0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := HeartbeatRequest{}
Expand All @@ -259,7 +272,8 @@ var brokerTestTable = []struct {
}
}},

{"ListGroupsRequest",
{V0_10_0_0,
"ListGroupsRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := ListGroupsRequest{}
Expand All @@ -272,7 +286,8 @@ var brokerTestTable = []struct {
}
}},

{"DescribeGroupsRequest",
{V0_10_0_0,
"DescribeGroupsRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := DescribeGroupsRequest{}
Expand All @@ -285,7 +300,8 @@ var brokerTestTable = []struct {
}
}},

{"ApiVersionsRequest",
{V0_10_0_0,
"ApiVersionsRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := ApiVersionsRequest{}
Expand All @@ -298,7 +314,8 @@ var brokerTestTable = []struct {
}
}},

{"DeleteGroupsRequest",
{V1_1_0_0,
"DeleteGroupsRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := DeleteGroupsRequest{}
Expand Down

0 comments on commit 61cb20c

Please sign in to comment.