Skip to content

Commit

Permalink
Merge pull request #196 from gmather/feature/195_rebalance_callback
Browse files Browse the repository at this point in the history
feature 195: Add a rebalance callback to the processor.
  • Loading branch information
frairon authored Jun 28, 2019
2 parents dfc3e62 + 33c9712 commit b00e782
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 2 deletions.
17 changes: 17 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
// The partition storage shall be updated in the callback.
type UpdateCallback func(s storage.Storage, partition int32, key string, value []byte) error

// RebalanceCallback is invoked when the processor receives a new partition assignment.
type RebalanceCallback func(a kafka.Assignment)

///////////////////////////////////////////////////////////////////////////////
// default values
///////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -48,6 +51,11 @@ func DefaultUpdate(s storage.Storage, partition int32, key string, value []byte)
return s.Set(key, value)
}


// DefaultRebalance is the default callback when a new partition assignment is received.
// DefaultRebalance can be used in the function passed to WithRebalanceCallback.
func DefaultRebalance(a kafka.Assignment) {}

// DefaultHasher returns an FNV hasher builder to assign keys to partitions.
func DefaultHasher() func() hash.Hash32 {
return func() hash.Hash32 {
Expand All @@ -69,6 +77,7 @@ type poptions struct {
clientID string

updateCallback UpdateCallback
rebalanceCallback RebalanceCallback
partitionChannelSize int
hasher func() hash.Hash32
nilHandling NilHandling
Expand Down Expand Up @@ -227,6 +236,14 @@ func (opt *poptions) applyOptions(gg *GroupGraph, opts ...ProcessorOption) error
return nil
}

// WithRebalanceCallback sets the callback for when a new partition assignment
// is received. By default, this is an empty function.
func WithRebalanceCallback(cb RebalanceCallback) ProcessorOption {
return func(o *poptions, gg *GroupGraph) {
o.rebalanceCallback = cb
}
}

///////////////////////////////////////////////////////////////////////////////
// view options
///////////////////////////////////////////////////////////////////////////////
Expand Down
4 changes: 4 additions & 0 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption)
WithUpdateCallback(DefaultUpdate),
WithPartitionChannelSize(defaultPartitionChannelSize),
WithStorageBuilder(storage.DefaultBuilder(DefaultProcessorStoragePath(gg.Group()))),
WithRebalanceCallback(DefaultRebalance),
},

// user-defined options (may overwrite default ones)
Expand Down Expand Up @@ -657,6 +658,9 @@ func (g *Processor) rebalance(errg *multierr.ErrGroup, ctx context.Context, part
errs := new(multierr.Errors)
g.opts.log.Printf("Processor: rebalancing: %+v", partitions)

// callback the new partition assignment
g.opts.rebalanceCallback(partitions)

for id := range partitions {
// create partition views
if err := g.createPartitionViews(errg, ctx, id); err != nil {
Expand Down
59 changes: 57 additions & 2 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

var (
rawCodec = new(codec.Bytes)
emptyRebalanceCallback = func(a kafka.Assignment){}
)

func nullStorageBuilder() storage.Builder {
Expand Down Expand Up @@ -88,7 +89,7 @@ func createFailedTopicManagerBuilder(tm kafka.TopicManager) kafka.TopicManagerBu
}
}

func createProcessorStateless(ctrl *gomock.Controller, consumer kafka.Consumer, producer kafka.Producer, npar int) *Processor {
func createProcessorStateless(ctrl *gomock.Controller, consumer kafka.Consumer, producer kafka.Producer, npar int, rcb func(a kafka.Assignment)) *Processor {
tm := mock.NewMockTopicManager(ctrl)

var partitions []int32
Expand All @@ -111,6 +112,7 @@ func createProcessorStateless(ctrl *gomock.Controller, consumer kafka.Consumer,
WithConsumerBuilder(createConsumerBuilder(consumer)),
WithProducerBuilder(createProducerBuilder(producer)),
WithPartitionChannelSize(0),
WithRebalanceCallback(rcb),
)
return p
}
Expand Down Expand Up @@ -1124,7 +1126,7 @@ func TestProcessor_StartStateless(t *testing.T) {
producer = mock.NewMockProducer(ctrl)
final = make(chan bool)
ch = make(chan kafka.Event)
p = createProcessorStateless(ctrl, consumer, producer, 3)
p = createProcessorStateless(ctrl, consumer, producer, 3, emptyRebalanceCallback)
)

// -- expectactions --
Expand Down Expand Up @@ -1440,6 +1442,59 @@ func TestProcessor_HasGetStateless(t *testing.T) {
ensure.True(t, value == nil)
}

func TestProcessor_RebalanceCallback(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
var (
consumer = mock.NewMockConsumer(ctrl)
producer = mock.NewMockProducer(ctrl)
final = make(chan bool)
ch = make(chan kafka.Event)
asmt = (*kafka.Assignment)(&map[int32]int64{0: -1, 1: -1})
i = 0
eAsmt = []kafka.Assignment{{}, *asmt}
rcb = func(a kafka.Assignment){
ensure.DeepEqual(t, a, eAsmt[i])
i += 1
}
p = createProcessorStateless(ctrl, consumer, producer, 3, rcb)
)

// -- expectactions --
// 1. start
consumer.EXPECT().Subscribe(topOff).Return(nil)
consumer.EXPECT().Events().Return(ch).AnyTimes()
// 2. rebalance
consumer.EXPECT().AddGroupPartition(int32(0))
consumer.EXPECT().AddGroupPartition(int32(1))
// 3. stop processor
consumer.EXPECT().Close().Return(nil).Do(func() { close(ch) })
producer.EXPECT().Close().Return(nil)

// -- test --
ctx, cancel := context.WithCancel(context.Background())
// 1. start
go func() {
err := p.Run(ctx)
ensure.Nil(t, err)
close(final)
}()

// 2. rebalance
ensure.True(t, len(p.partitions) == 0)
ch <- asmt
err := syncWith(t, ch, -1, 1, 2)
ensure.Nil(t, err)
ensure.True(t, len(p.partitions) == 2)

// 3. stop processor
err = doTimed(t, func() {
cancel()
<-final
})
ensure.Nil(t, err)
}

// Example shows how to use a callback. For each partition of the topics, a new
// goroutine will be created. Topics should be co-partitioned (they should have
// the same number of partitions and be partitioned by the same key).
Expand Down

0 comments on commit b00e782

Please sign in to comment.