Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Force rebalance when partition consumers exit with an error #169

Merged
merged 1 commit into from
Sep 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 0 additions & 48 deletions cluster.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
package cluster

import (
"fmt"
"sort"
)

// Strategy for partition to consumer assignement
type Strategy string

Expand All @@ -28,46 +23,3 @@ type Error struct {
Ctx string
error
}

// --------------------------------------------------------------------

type none struct{}

type topicPartition struct {
Topic string
Partition int32
}

func (tp *topicPartition) String() string {
return fmt.Sprintf("%s-%d", tp.Topic, tp.Partition)
}

type offsetInfo struct {
Offset int64
Metadata string
}

func (i offsetInfo) NextOffset(fallback int64) int64 {
if i.Offset > -1 {
return i.Offset
}
return fallback
}

type int32Slice []int32

func (p int32Slice) Len() int { return len(p) }
func (p int32Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p int32Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

func (p int32Slice) Diff(o int32Slice) (res []int32) {
on := len(o)
for _, x := range p {
n := sort.Search(on, func(i int) bool { return o[i] >= x })
if n < on && o[n] == x {
continue
}
res = append(res, x)
}
return
}
138 changes: 62 additions & 76 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,79 +236,61 @@ func (c *Consumer) mainLoop() {
default:
}

// Remember previous subscriptions
var notification *Notification
if c.client.config.Group.Return.Notifications {
notification = newNotification(c.subs.Info())
}
// Start next consume cycle
c.nextTick()
}
}

// Rebalance, fetch new subscriptions
subs, err := c.rebalance()
if err != nil {
c.rebalanceError(err, notification)
continue
}
func (c *Consumer) nextTick() {
// Remember previous subscriptions
var notification *Notification
if c.client.config.Group.Return.Notifications {
notification = newNotification(c.subs.Info())
}

// Start the heartbeat
hbStop, hbDone := make(chan none), make(chan none)
go c.hbLoop(hbStop, hbDone)
// Rebalance, fetch new subscriptions
subs, err := c.rebalance()
if err != nil {
c.rebalanceError(err, notification)
return
}

// Subscribe to topic/partitions
if err := c.subscribe(subs); err != nil {
close(hbStop)
<-hbDone
c.rebalanceError(err, notification)
continue
}
// Coordinate loops, make sure everything is
// stopped on exit
tomb := newLoopTomb()
defer tomb.Close()

// Update/issue notification with new claims
if c.client.config.Group.Return.Notifications {
notification.claim(subs)
c.handleNotification(notification)
}
// Start the heartbeat
tomb.Go(c.hbLoop)

// Start topic watcher loop
twStop, twDone := make(chan none), make(chan none)
go c.twLoop(twStop, twDone)
// Subscribe to topic/partitions
if err := c.subscribe(tomb, subs); err != nil {
c.rebalanceError(err, notification)
return
}

// Start consuming and committing offsets
cmStop, cmDone := make(chan none), make(chan none)
go c.cmLoop(cmStop, cmDone)
atomic.StoreInt32(&c.consuming, 1)
// Update/issue notification with new claims
if c.client.config.Group.Return.Notifications {
notification.claim(subs)
c.handleNotification(notification)
}

// Wait for signals
select {
case <-hbDone:
close(cmStop)
close(twStop)
<-cmDone
<-twDone
case <-twDone:
close(cmStop)
close(hbStop)
<-cmDone
<-hbDone
case <-cmDone:
close(twStop)
close(hbStop)
<-twDone
<-hbDone
case <-c.dying:
close(cmStop)
close(twStop)
close(hbStop)
<-cmDone
<-twDone
<-hbDone
return
}
// Start topic watcher loop
tomb.Go(c.twLoop)

// Start consuming and committing offsets
tomb.Go(c.cmLoop)
atomic.StoreInt32(&c.consuming, 1)

// Wait for signals
select {
case <-tomb.Dying():
case <-c.dying:
}
}

// heartbeat loop, triggered by the mainLoop
func (c *Consumer) hbLoop(stop <-chan none, done chan<- none) {
defer close(done)

func (c *Consumer) hbLoop(stopped <-chan none) {
ticker := time.NewTicker(c.client.config.Group.Heartbeat.Interval)
defer ticker.Stop()

Expand All @@ -323,16 +305,16 @@ func (c *Consumer) hbLoop(stop <-chan none, done chan<- none) {
c.handleError(&Error{Ctx: "heartbeat", error: err})
return
}
case <-stop:
case <-stopped:
return
case <-c.dying:
return
}
}
}

// topic watcher loop, triggered by the mainLoop
func (c *Consumer) twLoop(stop <-chan none, done chan<- none) {
defer close(done)

func (c *Consumer) twLoop(stopped <-chan none) {
ticker := time.NewTicker(c.client.config.Metadata.RefreshFrequency / 2)
defer ticker.Stop()

Expand All @@ -352,16 +334,16 @@ func (c *Consumer) twLoop(stop <-chan none, done chan<- none) {
return
}
}
case <-stop:
case <-stopped:
return
case <-c.dying:
return
}
}
}

// commit loop, triggered by the mainLoop
func (c *Consumer) cmLoop(stop <-chan none, done chan<- none) {
defer close(done)

func (c *Consumer) cmLoop(stopped <-chan none) {
ticker := time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval)
defer ticker.Stop()

Expand All @@ -372,7 +354,9 @@ func (c *Consumer) cmLoop(stop <-chan none, done chan<- none) {
c.handleError(&Error{Ctx: "commit", error: err})
return
}
case <-stop:
case <-stopped:
return
case <-c.dying:
return
}
}
Expand Down Expand Up @@ -507,7 +491,7 @@ func (c *Consumer) rebalance() (map[string][]int32, error) {
}

// Performs the subscription, part of the mainLoop()
func (c *Consumer) subscribe(subs map[string][]int32) error {
func (c *Consumer) subscribe(tomb *loopTomb, subs map[string][]int32) error {
// fetch offsets
offsets, err := c.fetchOffsets(subs)
if err != nil {
Expand All @@ -524,8 +508,8 @@ func (c *Consumer) subscribe(subs map[string][]int32) error {
wg.Add(1)

info := offsets[topic][partition]
go func(t string, p int32) {
if e := c.createConsumer(t, p, info); e != nil {
go func(topic string, partition int32) {
if e := c.createConsumer(tomb, topic, partition, info); e != nil {
mu.Lock()
err = e
mu.Unlock()
Expand Down Expand Up @@ -717,7 +701,7 @@ func (c *Consumer) leaveGroup() error {

// --------------------------------------------------------------------

func (c *Consumer) createConsumer(topic string, partition int32, info offsetInfo) error {
func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32, info offsetInfo) error {
sarama.Logger.Printf("cluster/consumer %s consume %s/%d from %d\n", c.memberID, topic, partition, info.NextOffset(c.client.config.Consumer.Offsets.Initial))

// Create partitionConsumer
Expand All @@ -730,7 +714,9 @@ func (c *Consumer) createConsumer(topic string, partition int32, info offsetInfo
c.subs.Store(topic, partition, pc)

// Start partition consumer goroutine
go pc.Loop(c.messages, c.errors)
tomb.Go(func(stopper <-chan none) {
pc.Loop(stopper, c.messages, c.errors)
})

return nil
}
Expand Down
24 changes: 13 additions & 11 deletions partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type partitionConsumer struct {
state partitionState
mu sync.Mutex

closed bool
once sync.Once
dying, dead chan none
}

Expand All @@ -39,7 +39,7 @@ func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32
}, nil
}

func (c *partitionConsumer) Loop(messages chan<- *sarama.ConsumerMessage, errors chan<- error) {
func (c *partitionConsumer) Loop(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) {
defer close(c.dead)

for {
Expand All @@ -50,6 +50,8 @@ func (c *partitionConsumer) Loop(messages chan<- *sarama.ConsumerMessage, errors
}
select {
case messages <- msg:
case <-stopper:
return
case <-c.dying:
return
}
Expand All @@ -59,25 +61,25 @@ func (c *partitionConsumer) Loop(messages chan<- *sarama.ConsumerMessage, errors
}
select {
case errors <- err:
case <-stopper:
return
case <-c.dying:
return
}
case <-stopper:
return
case <-c.dying:
return
}
}
}

func (c *partitionConsumer) Close() error {
if c.closed {
return nil
}

err := c.pcm.Close()
c.closed = true
close(c.dying)
func (c *partitionConsumer) Close() (err error) {
c.once.Do(func() {
err = c.pcm.Close()
close(c.dying)
})
<-c.dead

return err
}

Expand Down
Loading