Skip to content

Commit

Permalink
Merge pull request #588 from dim/kafka-09-consumer-groups
Browse files Browse the repository at this point in the history
Consumer groups on Kafka 0.9
  • Loading branch information
eapache committed Jan 4, 2016
2 parents 2b18ad7 + 77de7e9 commit 66d77e1
Show file tree
Hide file tree
Showing 8 changed files with 301 additions and 0 deletions.
44 changes: 44 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,50 @@ func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse,
return response, nil
}

func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
response := new(JoinGroupResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
response := new(SyncGroupResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
response := new(LeaveGroupResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
response := new(HeartbeatResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, error) {
b.lock.Lock()
defer b.lock.Unlock()
Expand Down
48 changes: 48 additions & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,52 @@ var brokerTestTable = []struct {
t.Error("Offset request got no response!")
}
}},

{[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := JoinGroupRequest{}
response, err := broker.JoinGroup(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("JoinGroup request got no response!")
}
}},

{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := SyncGroupRequest{}
response, err := broker.SyncGroup(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("SyncGroup request got no response!")
}
}},

{[]byte{0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := LeaveGroupRequest{}
response, err := broker.LeaveGroup(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("LeaveGroup request got no response!")
}
}},

{[]byte{0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := HeartbeatRequest{}
response, err := broker.Heartbeat(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("Heartbeat request got no response!")
}
}},
}
94 changes: 94 additions & 0 deletions consumer_group_members.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package sarama

type ConsumerGroupMemberMetadata struct {
Version int16
Topics []string
UserData []byte
}

func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
pe.putInt16(m.Version)

if err := pe.putStringArray(m.Topics); err != nil {
return err
}

if err := pe.putBytes(m.UserData); err != nil {
return err
}

return nil
}

func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) {
if m.Version, err = pd.getInt16(); err != nil {
return
}

if m.Topics, err = pd.getStringArray(); err != nil {
return
}

if m.UserData, err = pd.getBytes(); err != nil {
return
}

return nil
}

type ConsumerGroupMemberAssignment struct {
Version int16
Topics map[string][]int32
UserData []byte
}

func (m *ConsumerGroupMemberAssignment) encode(pe packetEncoder) error {
pe.putInt16(m.Version)

if err := pe.putArrayLength(len(m.Topics)); err != nil {
return err
}

for topic, partitions := range m.Topics {
if err := pe.putString(topic); err != nil {
return err
}
if err := pe.putInt32Array(partitions); err != nil {
return err
}
}

if err := pe.putBytes(m.UserData); err != nil {
return err
}

return nil
}

func (m *ConsumerGroupMemberAssignment) decode(pd packetDecoder) (err error) {
if m.Version, err = pd.getInt16(); err != nil {
return
}

var topicLen int
if topicLen, err = pd.getArrayLength(); err != nil {
return
}

m.Topics = make(map[string][]int32, topicLen)
for i := 0; i < topicLen; i++ {
var topic string
if topic, err = pd.getString(); err != nil {
return
}
if m.Topics[topic], err = pd.getInt32Array(); err != nil {
return
}
}

if m.UserData, err = pd.getBytes(); err != nil {
return
}

return nil
}
77 changes: 77 additions & 0 deletions consumer_group_members_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package sarama

import (
"bytes"
"reflect"
"testing"
)

var (
groupMemberMetadata = []byte{
0, 1, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}
groupMemberAssignment = []byte{
0, 1, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 0, 0, 3, // Topic one, partition array length
0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 4, // 0, 2, 4
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 2, // Topic two, partition array length
0, 0, 0, 1, 0, 0, 0, 3, // 1, 3
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}
)

func TestConsumerGroupMemberMetadata(t *testing.T) {
meta := &ConsumerGroupMemberMetadata{
Version: 1,
Topics: []string{"one", "two"},
UserData: []byte{0x01, 0x02, 0x03},
}

buf, err := encode(meta)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberMetadata, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberMetadata, buf)
}

meta2 := new(ConsumerGroupMemberMetadata)
err = decode(buf, meta2)
if err != nil {
t.Error("Failed to decode data", err)
} else if !reflect.DeepEqual(meta, meta2) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", meta, meta2)
}
}

func TestConsumerGroupMemberAssignment(t *testing.T) {
amt := &ConsumerGroupMemberAssignment{
Version: 1,
Topics: map[string][]int32{
"one": []int32{0, 2, 4},
"two": []int32{1, 3},
},
UserData: []byte{0x01, 0x02, 0x03},
}

buf, err := encode(amt)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberAssignment, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberAssignment, buf)
}

amt2 := new(ConsumerGroupMemberAssignment)
err = decode(buf, amt2)
if err != nil {
t.Error("Failed to decode data", err)
} else if !reflect.DeepEqual(amt, amt2) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", amt, amt2)
}
}
10 changes: 10 additions & 0 deletions join_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,13 @@ func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {

r.GroupProtocols[name] = metadata
}

func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error {
bin, err := encode(metadata)
if err != nil {
return err
}

r.AddGroupProtocol(name, bin)
return nil
}
12 changes: 12 additions & 0 deletions join_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ type JoinGroupResponse struct {
Members map[string][]byte
}

func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error) {
members := make(map[string]ConsumerGroupMemberMetadata, len(r.Members))
for id, bin := range r.Members {
meta := new(ConsumerGroupMemberMetadata)
if err := decode(bin, meta); err != nil {
return nil, err
}
members[id] = *meta
}
return members, nil
}

func (r *JoinGroupResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err))
pe.putInt32(r.GenerationId)
Expand Down
10 changes: 10 additions & 0 deletions sync_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,13 @@ func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment

r.GroupAssignments[memberId] = memberAssignment
}

func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error {
bin, err := encode(memberAssignment)
if err != nil {
return err
}

r.AddGroupAssignment(memberId, bin)
return nil
}
6 changes: 6 additions & 0 deletions sync_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ type SyncGroupResponse struct {
MemberAssignment []byte
}

func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) {
assignment := new(ConsumerGroupMemberAssignment)
err := decode(r.MemberAssignment, assignment)
return assignment, err
}

func (r *SyncGroupResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err))
return pe.putBytes(r.MemberAssignment)
Expand Down

0 comments on commit 66d77e1

Please sign in to comment.