From 27954669453355f29642281edb87ef158a7762c1 Mon Sep 17 00:00:00 2001 From: Dimitrij Denissenko Date: Wed, 23 Dec 2015 14:31:39 +0000 Subject: [PATCH 1/4] More 0.9 protocol additions. Add broker accessors --- broker.go | 44 +++++++++++++++++++ broker_test.go | 48 +++++++++++++++++++++ config.go | 17 ++++++++ group_members.go | 98 ++++++++++++++++++++++++++++++++++++++++++ group_members_test.go | 77 +++++++++++++++++++++++++++++++++ join_group_request.go | 10 +++++ join_group_response.go | 12 ++++++ sync_group_request.go | 10 +++++ sync_group_response.go | 6 +++ 9 files changed, 322 insertions(+) create mode 100644 group_members.go create mode 100644 group_members_test.go diff --git a/broker.go b/broker.go index 46f06a0f3..5bd1df2d6 100644 --- a/broker.go +++ b/broker.go @@ -239,6 +239,50 @@ func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, return response, nil } +func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) { + response := new(JoinGroupResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + +func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) { + response := new(SyncGroupResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + +func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) { + response := new(LeaveGroupResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + +func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) { + response := new(HeartbeatResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, error) { b.lock.Lock() defer b.lock.Unlock() diff --git a/broker_test.go b/broker_test.go index f1aa2e8ba..590a4dc28 100644 --- a/broker_test.go +++ b/broker_test.go @@ -176,4 +176,52 @@ var brokerTestTable = []struct { t.Error("Offset request got no response!") } }}, + + {[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, + func(t *testing.T, broker *Broker) { + request := JoinGroupRequest{} + response, err := broker.JoinGroup(&request) + if err != nil { + t.Error(err) + } + if response == nil { + t.Error("JoinGroup request got no response!") + } + }}, + + {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, + func(t *testing.T, broker *Broker) { + request := SyncGroupRequest{} + response, err := broker.SyncGroup(&request) + if err != nil { + t.Error(err) + } + if response == nil { + t.Error("SyncGroup request got no response!") + } + }}, + + {[]byte{0x00, 0x00}, + func(t *testing.T, broker *Broker) { + request := LeaveGroupRequest{} + response, err := broker.LeaveGroup(&request) + if err != nil { + t.Error(err) + } + if response == nil { + t.Error("LeaveGroup request got no response!") + } + }}, + + {[]byte{0x00, 0x00}, + func(t *testing.T, broker *Broker) { + request := HeartbeatRequest{} + response, err := broker.Heartbeat(&request) + if err != nil { + t.Error(err) + } + if response == nil { + t.Error("Heartbeat request got no response!") + } + }}, } diff --git a/config.go b/config.go index 542c61172..ecac0dcb3 100644 --- a/config.go +++ b/config.go @@ -54,6 +54,20 @@ type Config struct { RefreshFrequency time.Duration } + // Group is the namespace for group management properties + Group struct { + Session struct { + // The allowed session timeout for registered consumers (defaults to 30s). + // Must be within the allowed server range. + Timeout time.Duration + } + Heartbeat struct { + // Interval between each heartbeat (defaults to 3s). It should be no more + // than 1/3rd of the Group.Session.Timout setting + Interval time.Duration + } + } + // Producer is the namespace for configuration related to producing messages, // used by the Producer. Producer struct { @@ -212,6 +226,9 @@ func NewConfig() *Config { c.Metadata.Retry.Backoff = 250 * time.Millisecond c.Metadata.RefreshFrequency = 10 * time.Minute + c.Group.Session.Timeout = 30 * time.Second + c.Group.Heartbeat.Interval = 3 * time.Second + c.Producer.MaxMessageBytes = 1000000 c.Producer.RequiredAcks = WaitForLocal c.Producer.Timeout = 10 * time.Second diff --git a/group_members.go b/group_members.go new file mode 100644 index 000000000..5e8066d70 --- /dev/null +++ b/group_members.go @@ -0,0 +1,98 @@ +package sarama + +type GroupMemberMetadata struct { + Version int16 + Topics []string + UserData []byte +} + +func (m *GroupMemberMetadata) encode(pe packetEncoder) error { + pe.putInt16(m.Version) + + if err := pe.putStringArray(m.Topics); err != nil { + return err + } + + if err := pe.putBytes(m.UserData); err != nil { + return err + } + + return nil +} + +func (m *GroupMemberMetadata) decode(pd packetDecoder) (err error) { + if m.Version, err = pd.getInt16(); err != nil { + return + } + + if m.Topics, err = pd.getStringArray(); err != nil { + return + } + + if m.UserData, err = pd.getBytes(); err != nil { + return + } + + return nil +} + +type GroupMemberAssignment struct { + Version int16 + Topics []GroupMemberAssignedTopic + UserData []byte +} + +type GroupMemberAssignedTopic struct { + Topic string + Partitions []int32 +} + +func (m *GroupMemberAssignment) encode(pe packetEncoder) error { + pe.putInt16(m.Version) + + if err := pe.putArrayLength(len(m.Topics)); err != nil { + return err + } + + for _, topic := range m.Topics { + if err := pe.putString(topic.Topic); err != nil { + return err + } + if err := pe.putInt32Array(topic.Partitions); err != nil { + return err + } + } + + if err := pe.putBytes(m.UserData); err != nil { + return err + } + + return nil +} + +func (m *GroupMemberAssignment) decode(pd packetDecoder) (err error) { + if m.Version, err = pd.getInt16(); err != nil { + return + } + + var topicLen int + if topicLen, err = pd.getArrayLength(); err != nil { + return + } + + m.Topics = make([]GroupMemberAssignedTopic, topicLen) + for i := 0; i < topicLen; i++ { + if m.Topics[i].Topic, err = pd.getString(); err != nil { + return + } + if m.Topics[i].Partitions, err = pd.getInt32Array(); err != nil { + return + } + } + + if m.UserData, err = pd.getBytes(); err != nil { + return + } + + return nil +} diff --git a/group_members_test.go b/group_members_test.go new file mode 100644 index 000000000..93afde9d9 --- /dev/null +++ b/group_members_test.go @@ -0,0 +1,77 @@ +package sarama + +import ( + "bytes" + "reflect" + "testing" +) + +var ( + groupMemberMetadata = []byte{ + 0, 1, // Version + 0, 0, 0, 2, // Topic array length + 0, 3, 'o', 'n', 'e', // Topic one + 0, 3, 't', 'w', 'o', // Topic two + 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata + } + groupMemberAssignment = []byte{ + 0, 1, // Version + 0, 0, 0, 2, // Topic array length + 0, 3, 'o', 'n', 'e', // Topic one + 0, 0, 0, 3, // Topic one, partition array length + 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 4, // 0, 2, 4 + 0, 3, 't', 'w', 'o', // Topic two + 0, 0, 0, 2, // Topic two, partition array length + 0, 0, 0, 1, 0, 0, 0, 3, // 1, 3 + 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata + } +) + +func TestGroupMemberMetadata(t *testing.T) { + meta := &GroupMemberMetadata{ + Version: 1, + Topics: []string{"one", "two"}, + UserData: []byte{0x01, 0x02, 0x03}, + } + + buf, err := encode(meta) + if err != nil { + t.Error("Failed to encode data", err) + } else if !bytes.Equal(groupMemberMetadata, buf) { + t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberMetadata, buf) + } + + meta2 := new(GroupMemberMetadata) + err = decode(buf, meta2) + if err != nil { + t.Error("Failed to decode data", err) + } else if !reflect.DeepEqual(meta, meta2) { + t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", meta, meta2) + } +} + +func TestGroupMemberAssignment(t *testing.T) { + amt := &GroupMemberAssignment{ + Version: 1, + Topics: []GroupMemberAssignedTopic{ + {Topic: "one", Partitions: []int32{0, 2, 4}}, + {Topic: "two", Partitions: []int32{1, 3}}, + }, + UserData: []byte{0x01, 0x02, 0x03}, + } + + buf, err := encode(amt) + if err != nil { + t.Error("Failed to encode data", err) + } else if !bytes.Equal(groupMemberAssignment, buf) { + t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberAssignment, buf) + } + + amt2 := new(GroupMemberAssignment) + err = decode(buf, amt2) + if err != nil { + t.Error("Failed to decode data", err) + } else if !reflect.DeepEqual(amt, amt2) { + t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", amt, amt2) + } +} diff --git a/join_group_request.go b/join_group_request.go index 8bb5ce826..13812449e 100644 --- a/join_group_request.go +++ b/join_group_request.go @@ -92,3 +92,13 @@ func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) { r.GroupProtocols[name] = metadata } + +func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *GroupMemberMetadata) error { + bin, err := encode(metadata) + if err != nil { + return err + } + + r.AddGroupProtocol(name, bin) + return nil +} diff --git a/join_group_response.go b/join_group_response.go index 037a9cd26..15e16cf1c 100644 --- a/join_group_response.go +++ b/join_group_response.go @@ -9,6 +9,18 @@ type JoinGroupResponse struct { Members map[string][]byte } +func (r *JoinGroupResponse) GetMembers() (map[string]GroupMemberMetadata, error) { + members := make(map[string]GroupMemberMetadata, len(r.Members)) + for id, bin := range r.Members { + meta := new(GroupMemberMetadata) + if err := decode(bin, meta); err != nil { + return nil, err + } + members[id] = *meta + } + return members, nil +} + func (r *JoinGroupResponse) encode(pe packetEncoder) error { pe.putInt16(int16(r.Err)) pe.putInt32(r.GenerationId) diff --git a/sync_group_request.go b/sync_group_request.go index 60be6f3f3..b0faed2ef 100644 --- a/sync_group_request.go +++ b/sync_group_request.go @@ -84,3 +84,13 @@ func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment r.GroupAssignments[memberId] = memberAssignment } + +func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *GroupMemberAssignment) error { + bin, err := encode(memberAssignment) + if err != nil { + return err + } + + r.AddGroupAssignment(memberId, bin) + return nil +} diff --git a/sync_group_response.go b/sync_group_response.go index e10685ef8..406c27db5 100644 --- a/sync_group_response.go +++ b/sync_group_response.go @@ -5,6 +5,12 @@ type SyncGroupResponse struct { MemberAssignment []byte } +func (r *SyncGroupResponse) GetMemberAssignment() (*GroupMemberAssignment, error) { + assignment := new(GroupMemberAssignment) + err := decode(r.MemberAssignment, assignment) + return assignment, err +} + func (r *SyncGroupResponse) encode(pe packetEncoder) error { pe.putInt16(int16(r.Err)) return pe.putBytes(r.MemberAssignment) From 914efc548fb3a06a5d15a0b6c984e62a28df267b Mon Sep 17 00:00:00 2001 From: Dimitrij Denissenko Date: Wed, 23 Dec 2015 20:56:42 +0000 Subject: [PATCH 2/4] Address feedback --- config.go | 14 ++++++++++++++ group_members.go | 20 ++++++++------------ group_members_test.go | 6 +++--- 3 files changed, 25 insertions(+), 15 deletions(-) diff --git a/config.go b/config.go index ecac0dcb3..30658c1b6 100644 --- a/config.go +++ b/config.go @@ -276,6 +276,12 @@ func (c *Config) Validate() error { if c.Consumer.MaxWaitTime%time.Millisecond != 0 { Logger.Println("Consumer.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.") } + if c.Group.Heartbeat.Interval%time.Millisecond != 0 { + Logger.Println("Group.Heartbeat.Interval only supports millisecond precision; nanoseconds will be truncated.") + } + if c.Group.Session.Timeout%time.Millisecond != 0 { + Logger.Println("Group.Session.Timeout only supports millisecond precision; nanoseconds will be truncated.") + } if c.ClientID == "sarama" { Logger.Println("ClientID is the default of 'sarama', you should consider setting it to something application-specific.") } @@ -304,6 +310,14 @@ func (c *Config) Validate() error { return ConfigurationError("Metadata.RefreshFrequency must be >= 0") } + // validate the Group values + switch { + case c.Group.Heartbeat.Interval <= 0: + return ConfigurationError("Group.Heartbeat.Interval must be > 0") + case c.Group.Session.Timeout <= 0: + return ConfigurationError("Group.Session.Timeout must be > 0") + } + // validate the Producer values switch { case c.Producer.MaxMessageBytes <= 0: diff --git a/group_members.go b/group_members.go index 5e8066d70..bdf0e7343 100644 --- a/group_members.go +++ b/group_members.go @@ -38,15 +38,10 @@ func (m *GroupMemberMetadata) decode(pd packetDecoder) (err error) { type GroupMemberAssignment struct { Version int16 - Topics []GroupMemberAssignedTopic + Topics map[string][]int32 UserData []byte } -type GroupMemberAssignedTopic struct { - Topic string - Partitions []int32 -} - func (m *GroupMemberAssignment) encode(pe packetEncoder) error { pe.putInt16(m.Version) @@ -54,11 +49,11 @@ func (m *GroupMemberAssignment) encode(pe packetEncoder) error { return err } - for _, topic := range m.Topics { - if err := pe.putString(topic.Topic); err != nil { + for topic, partitions := range m.Topics { + if err := pe.putString(topic); err != nil { return err } - if err := pe.putInt32Array(topic.Partitions); err != nil { + if err := pe.putInt32Array(partitions); err != nil { return err } } @@ -80,12 +75,13 @@ func (m *GroupMemberAssignment) decode(pd packetDecoder) (err error) { return } - m.Topics = make([]GroupMemberAssignedTopic, topicLen) + m.Topics = make(map[string][]int32, topicLen) for i := 0; i < topicLen; i++ { - if m.Topics[i].Topic, err = pd.getString(); err != nil { + var topic string + if topic, err = pd.getString(); err != nil { return } - if m.Topics[i].Partitions, err = pd.getInt32Array(); err != nil { + if m.Topics[topic], err = pd.getInt32Array(); err != nil { return } } diff --git a/group_members_test.go b/group_members_test.go index 93afde9d9..599502e0d 100644 --- a/group_members_test.go +++ b/group_members_test.go @@ -53,9 +53,9 @@ func TestGroupMemberMetadata(t *testing.T) { func TestGroupMemberAssignment(t *testing.T) { amt := &GroupMemberAssignment{ Version: 1, - Topics: []GroupMemberAssignedTopic{ - {Topic: "one", Partitions: []int32{0, 2, 4}}, - {Topic: "two", Partitions: []int32{1, 3}}, + Topics: map[string][]int32{ + "one": []int32{0, 2, 4}, + "two": []int32{1, 3}, }, UserData: []byte{0x01, 0x02, 0x03}, } From 83f973c2d6d898c349c3eecfa65fd0e38b24e172 Mon Sep 17 00:00:00 2001 From: Dimitrij Denissenko Date: Wed, 23 Dec 2015 22:11:41 +0000 Subject: [PATCH 3/4] More feedback, renamed structs --- group_members.go => consumer_group_members.go | 12 ++++++------ ...members_test.go => consumer_group_members_test.go | 12 ++++++------ join_group_request.go | 2 +- join_group_response.go | 6 +++--- sync_group_request.go | 2 +- sync_group_response.go | 4 ++-- 6 files changed, 19 insertions(+), 19 deletions(-) rename group_members.go => consumer_group_members.go (77%) rename group_members_test.go => consumer_group_members_test.go (87%) diff --git a/group_members.go b/consumer_group_members.go similarity index 77% rename from group_members.go rename to consumer_group_members.go index bdf0e7343..9d92d350a 100644 --- a/group_members.go +++ b/consumer_group_members.go @@ -1,12 +1,12 @@ package sarama -type GroupMemberMetadata struct { +type ConsumerGroupMemberMetadata struct { Version int16 Topics []string UserData []byte } -func (m *GroupMemberMetadata) encode(pe packetEncoder) error { +func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error { pe.putInt16(m.Version) if err := pe.putStringArray(m.Topics); err != nil { @@ -20,7 +20,7 @@ func (m *GroupMemberMetadata) encode(pe packetEncoder) error { return nil } -func (m *GroupMemberMetadata) decode(pd packetDecoder) (err error) { +func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) { if m.Version, err = pd.getInt16(); err != nil { return } @@ -36,13 +36,13 @@ func (m *GroupMemberMetadata) decode(pd packetDecoder) (err error) { return nil } -type GroupMemberAssignment struct { +type ConsumerGroupMemberAssignment struct { Version int16 Topics map[string][]int32 UserData []byte } -func (m *GroupMemberAssignment) encode(pe packetEncoder) error { +func (m *ConsumerGroupMemberAssignment) encode(pe packetEncoder) error { pe.putInt16(m.Version) if err := pe.putArrayLength(len(m.Topics)); err != nil { @@ -65,7 +65,7 @@ func (m *GroupMemberAssignment) encode(pe packetEncoder) error { return nil } -func (m *GroupMemberAssignment) decode(pd packetDecoder) (err error) { +func (m *ConsumerGroupMemberAssignment) decode(pd packetDecoder) (err error) { if m.Version, err = pd.getInt16(); err != nil { return } diff --git a/group_members_test.go b/consumer_group_members_test.go similarity index 87% rename from group_members_test.go rename to consumer_group_members_test.go index 599502e0d..4213d81ee 100644 --- a/group_members_test.go +++ b/consumer_group_members_test.go @@ -27,8 +27,8 @@ var ( } ) -func TestGroupMemberMetadata(t *testing.T) { - meta := &GroupMemberMetadata{ +func TestConsumerGroupMemberMetadata(t *testing.T) { + meta := &ConsumerGroupMemberMetadata{ Version: 1, Topics: []string{"one", "two"}, UserData: []byte{0x01, 0x02, 0x03}, @@ -41,7 +41,7 @@ func TestGroupMemberMetadata(t *testing.T) { t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberMetadata, buf) } - meta2 := new(GroupMemberMetadata) + meta2 := new(ConsumerGroupMemberMetadata) err = decode(buf, meta2) if err != nil { t.Error("Failed to decode data", err) @@ -50,8 +50,8 @@ func TestGroupMemberMetadata(t *testing.T) { } } -func TestGroupMemberAssignment(t *testing.T) { - amt := &GroupMemberAssignment{ +func TestConsumerGroupMemberAssignment(t *testing.T) { + amt := &ConsumerGroupMemberAssignment{ Version: 1, Topics: map[string][]int32{ "one": []int32{0, 2, 4}, @@ -67,7 +67,7 @@ func TestGroupMemberAssignment(t *testing.T) { t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberAssignment, buf) } - amt2 := new(GroupMemberAssignment) + amt2 := new(ConsumerGroupMemberAssignment) err = decode(buf, amt2) if err != nil { t.Error("Failed to decode data", err) diff --git a/join_group_request.go b/join_group_request.go index 13812449e..5884d79d4 100644 --- a/join_group_request.go +++ b/join_group_request.go @@ -93,7 +93,7 @@ func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) { r.GroupProtocols[name] = metadata } -func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *GroupMemberMetadata) error { +func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error { bin, err := encode(metadata) if err != nil { return err diff --git a/join_group_response.go b/join_group_response.go index 15e16cf1c..16f6b9b40 100644 --- a/join_group_response.go +++ b/join_group_response.go @@ -9,10 +9,10 @@ type JoinGroupResponse struct { Members map[string][]byte } -func (r *JoinGroupResponse) GetMembers() (map[string]GroupMemberMetadata, error) { - members := make(map[string]GroupMemberMetadata, len(r.Members)) +func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error) { + members := make(map[string]ConsumerGroupMemberMetadata, len(r.Members)) for id, bin := range r.Members { - meta := new(GroupMemberMetadata) + meta := new(ConsumerGroupMemberMetadata) if err := decode(bin, meta); err != nil { return nil, err } diff --git a/sync_group_request.go b/sync_group_request.go index b0faed2ef..031cf0f2f 100644 --- a/sync_group_request.go +++ b/sync_group_request.go @@ -85,7 +85,7 @@ func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment r.GroupAssignments[memberId] = memberAssignment } -func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *GroupMemberAssignment) error { +func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error { bin, err := encode(memberAssignment) if err != nil { return err diff --git a/sync_group_response.go b/sync_group_response.go index 406c27db5..49c86922d 100644 --- a/sync_group_response.go +++ b/sync_group_response.go @@ -5,8 +5,8 @@ type SyncGroupResponse struct { MemberAssignment []byte } -func (r *SyncGroupResponse) GetMemberAssignment() (*GroupMemberAssignment, error) { - assignment := new(GroupMemberAssignment) +func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) { + assignment := new(ConsumerGroupMemberAssignment) err := decode(r.MemberAssignment, assignment) return assignment, err } From 77de7e99a34cfad81dae28ff465caa17e2d1d7c9 Mon Sep 17 00:00:00 2001 From: Dimitrij Denissenko Date: Thu, 24 Dec 2015 10:04:07 +0000 Subject: [PATCH 4/4] Remove group configuration. Group consumers will be implemented as a separate package --- config.go | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/config.go b/config.go index 30658c1b6..542c61172 100644 --- a/config.go +++ b/config.go @@ -54,20 +54,6 @@ type Config struct { RefreshFrequency time.Duration } - // Group is the namespace for group management properties - Group struct { - Session struct { - // The allowed session timeout for registered consumers (defaults to 30s). - // Must be within the allowed server range. - Timeout time.Duration - } - Heartbeat struct { - // Interval between each heartbeat (defaults to 3s). It should be no more - // than 1/3rd of the Group.Session.Timout setting - Interval time.Duration - } - } - // Producer is the namespace for configuration related to producing messages, // used by the Producer. Producer struct { @@ -226,9 +212,6 @@ func NewConfig() *Config { c.Metadata.Retry.Backoff = 250 * time.Millisecond c.Metadata.RefreshFrequency = 10 * time.Minute - c.Group.Session.Timeout = 30 * time.Second - c.Group.Heartbeat.Interval = 3 * time.Second - c.Producer.MaxMessageBytes = 1000000 c.Producer.RequiredAcks = WaitForLocal c.Producer.Timeout = 10 * time.Second @@ -276,12 +259,6 @@ func (c *Config) Validate() error { if c.Consumer.MaxWaitTime%time.Millisecond != 0 { Logger.Println("Consumer.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.") } - if c.Group.Heartbeat.Interval%time.Millisecond != 0 { - Logger.Println("Group.Heartbeat.Interval only supports millisecond precision; nanoseconds will be truncated.") - } - if c.Group.Session.Timeout%time.Millisecond != 0 { - Logger.Println("Group.Session.Timeout only supports millisecond precision; nanoseconds will be truncated.") - } if c.ClientID == "sarama" { Logger.Println("ClientID is the default of 'sarama', you should consider setting it to something application-specific.") } @@ -310,14 +287,6 @@ func (c *Config) Validate() error { return ConfigurationError("Metadata.RefreshFrequency must be >= 0") } - // validate the Group values - switch { - case c.Group.Heartbeat.Interval <= 0: - return ConfigurationError("Group.Heartbeat.Interval must be > 0") - case c.Group.Session.Timeout <= 0: - return ConfigurationError("Group.Session.Timeout must be > 0") - } - // validate the Producer values switch { case c.Producer.MaxMessageBytes <= 0: