From 21f7755de1f832f69796ff931e21f8463d99e5ba Mon Sep 17 00:00:00 2001 From: Willem van Bergen Date: Mon, 7 Dec 2015 13:43:33 -0500 Subject: [PATCH] Add SyncGroup request and response pair. --- sync_group_request.go | 93 ++++++++++++++++++++++++++++++++++++++++++ sync_group_response.go | 22 ++++++++++ 2 files changed, 115 insertions(+) create mode 100644 sync_group_request.go create mode 100644 sync_group_response.go diff --git a/sync_group_request.go b/sync_group_request.go new file mode 100644 index 0000000000..d8babcc631 --- /dev/null +++ b/sync_group_request.go @@ -0,0 +1,93 @@ +package sarama + +type SyncGroupRequest struct { + GroupId string + GenerationId string + MemberId string + GroupAssignments []*GroupAssignment +} + +func (r *SyncGroupRequest) encode(pe packetEncoder) error { + if err := pe.putString(r.GroupId); err != nil { + return err + } + if err := pe.putString(r.GenerationId); err != nil { + return err + } + if err := pe.putString(r.MemberId); err != nil { + return err + } + + if err := pe.putArrayLength(len(r.GroupAssignments)); err != nil { + return err + } + for _, groupAssignment := range r.GroupAssignments { + if err := groupAssignment.encode(pe); err != nil { + return err + } + } + + return nil +} + +func (r *SyncGroupRequest) decode(pd packetDecoder) (err error) { + if r.GroupId, err = pd.getString(); err != nil { + return + } + if r.GenerationId, err = pd.getString(); err != nil { + return + } + if r.MemberId, err = pd.getString(); err != nil { + return + } + + n, err := pd.getArrayLength() + if err != nil { + return err + } + + r.GroupAssignments = make([]*GroupAssignment, n) + for i := 0; i < n; i++ { + r.GroupAssignments[i] = new(GroupAssignment) + if err := r.GroupAssignments[i].decode(pd); err != nil { + return err + } + } + + return nil +} + +func (r *SyncGroupRequest) key() int16 { + return 14 +} + +func (r *SyncGroupRequest) version() int16 { + return 0 +} + +type GroupAssignment struct { + MemberId string + MemberAssignment []byte +} + +func (gd *GroupAssignment) encode(pe packetEncoder) error { + if err := pe.putString(gd.MemberId); err != nil { + return err + } + if err := pe.putBytes(gd.MemberAssignment); err != nil { + return err + } + + return nil +} + +func (gd *GroupAssignment) decode(pd packetDecoder) (err error) { + if gd.MemberId, err = pd.getString(); err != nil { + return + } + if gd.MemberAssignment, err = pd.getBytes(); err != nil { + return + } + + return nil +} diff --git a/sync_group_response.go b/sync_group_response.go new file mode 100644 index 0000000000..e10685ef88 --- /dev/null +++ b/sync_group_response.go @@ -0,0 +1,22 @@ +package sarama + +type SyncGroupResponse struct { + Err KError + MemberAssignment []byte +} + +func (r *SyncGroupResponse) encode(pe packetEncoder) error { + pe.putInt16(int16(r.Err)) + return pe.putBytes(r.MemberAssignment) +} + +func (r *SyncGroupResponse) decode(pd packetDecoder) (err error) { + if kerr, err := pd.getInt16(); err != nil { + return err + } else { + r.Err = KError(kerr) + } + + r.MemberAssignment, err = pd.getBytes() + return +}