From 53285c9f5e7715066c3b647602b9638abdfcde26 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Fri, 6 May 2016 16:13:59 -0400 Subject: [PATCH] Fix race condition on OffsetManager shutdown If `updateCommitted` ran between the `Unlock` call and the read from `clean` then we might miss the transition message and hang. Instead, use a condition value from the `sync` package which does the right thing. --- offset_manager.go | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/offset_manager.go b/offset_manager.go index 15ddecbc8..ebfd8b403 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -176,7 +176,7 @@ type partitionOffsetManager struct { offset int64 metadata string dirty bool - clean chan none + clean sync.Cond broker *brokerOffsetManager errors chan *ConsumerError @@ -189,11 +189,11 @@ func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32 parent: om, topic: topic, partition: partition, - clean: make(chan none), errors: make(chan *ConsumerError, om.conf.ChannelBufferSize), rebalance: make(chan none, 1), dying: make(chan none), } + pom.clean.L = &pom.lock if err := pom.selectBroker(); err != nil { return nil, err @@ -331,11 +331,7 @@ func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string if pom.offset == offset && pom.metadata == metadata { pom.dirty = false - - select { - case pom.clean <- none{}: - default: - } + pom.clean.Signal() } } @@ -353,11 +349,10 @@ func (pom *partitionOffsetManager) NextOffset() (int64, string) { func (pom *partitionOffsetManager) AsyncClose() { go func() { pom.lock.Lock() - dirty := pom.dirty - pom.lock.Unlock() + defer pom.lock.Unlock() - if dirty { - <-pom.clean + for pom.dirty { + pom.clean.Wait() } close(pom.dying)