Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition on OffsetManager shutdown #658

Merged
merged 1 commit into from
May 10, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 6 additions & 11 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ type partitionOffsetManager struct {
offset int64
metadata string
dirty bool
clean chan none
clean sync.Cond
broker *brokerOffsetManager

errors chan *ConsumerError
Expand All @@ -189,11 +189,11 @@ func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32
parent: om,
topic: topic,
partition: partition,
clean: make(chan none),
errors: make(chan *ConsumerError, om.conf.ChannelBufferSize),
rebalance: make(chan none, 1),
dying: make(chan none),
}
pom.clean.L = &pom.lock

if err := pom.selectBroker(); err != nil {
return nil, err
Expand Down Expand Up @@ -331,11 +331,7 @@ func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string

if pom.offset == offset && pom.metadata == metadata {
pom.dirty = false

select {
case pom.clean <- none{}:
default:
}
pom.clean.Signal()
}
}

Expand All @@ -353,11 +349,10 @@ func (pom *partitionOffsetManager) NextOffset() (int64, string) {
func (pom *partitionOffsetManager) AsyncClose() {
go func() {
pom.lock.Lock()
dirty := pom.dirty
pom.lock.Unlock()
defer pom.lock.Unlock()

if dirty {
<-pom.clean
for pom.dirty {
pom.clean.Wait()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't seen sync.Cond used before. This looks like a pretty clean solution!

This seems to solve the issue with hanging, but could pom.dirty change to true after this goroutine finishes? I suppose this would only happen if MarkOffset is called after a Close, is it worth worrying about?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling anything after a Close or AsyncClose is user-error, not worth worrying about

}

close(pom.dying)
Expand Down