Skip to content

Commit

Permalink
consumer & consumer group: small redux, bug fixes
Browse files Browse the repository at this point in the history
ATOMIC STORES
=============

This commit switches the consumer type to be stored in an atomic value,
rather than a uint8 type that specifies which pointer to use guarded by
a mutex.

This switch fundamentally arises from trying to unblock metadata updates
while a group consumer is leaving the group. Previously, we had to grab
the consumer lock to check the consumer type to check if we were
consuming with regex. We avoid that now.

This actually makes a bunch of other areas simpler as well -- many
places needed the group consumer to do some logic on the group consumer
directly. Previously, we had to grab the consumer lock, and for
simplicity we held it through the function. Holding it was unnecessary,
and now we avoid grabbing the lock at all.

Anything that sets the consumer value grabs a new dedicated assignMu.
The dedicated assignMu allows us to unblock a clean group leave, which
may (in a shortly incoming commit) grab the consumer mu to assign
partitions on revoke.

We do not have to worry about TOCTOU: the guarantee is things work in
order. If a person concurrently modifies something, they may change the
outcome of stuff that was set into sequence by original events, but the
outcome is still sound according to our client. Particularly, a later
metadata update will trigger the right sequence for the new assignment.

Same type of logic with offset setting, but people should not be doing
that concurrently with assigning and whatnot.

UPDATES & LOCK ORDERING FIXES
=============================

This is the bulk of this commit that mostly fixes some lock orderings
and missing locks.

This should fix the panic in #24 by at least logging on when it would be
detected and continuing, however the bug itself is still a mystery. The
debug logs about what the balance results were should help, though, if
this crops up again.

There are a few lock ordering fixes in here which are now documented
extensively. Notably, PollFetches needs the consumer mu, and there is a
huge reason as to why.

The prerevoke and revoke logic, and how we ensure things are done before
returning sometimes, is all more extensively documented.

Lastly, all instances of assignPartitions is now properly guarded by the
consumer mutex. Prior, some instances were not.
  • Loading branch information
twmb committed Mar 21, 2021
1 parent b531098 commit 938651e
Show file tree
Hide file tree
Showing 7 changed files with 533 additions and 329 deletions.
19 changes: 19 additions & 0 deletions pkg/kgo/atomic_maybe_work.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,25 @@ package kgo

import "sync/atomic"

// a helper type for some places
type atomicBool uint32

func (b *atomicBool) set(v bool) {
if v {
atomic.StoreUint32((*uint32)(b), 1)
} else {
atomic.StoreUint32((*uint32)(b), 0)
}
}

func (b *atomicBool) get() bool {
v := atomic.LoadUint32((*uint32)(b))
if v == 1 {
return true
}
return false
}

const (
stateUnstarted = iota
stateWorking
Expand Down
20 changes: 6 additions & 14 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ func NewClient(opts ...Opt) (*Client, error) {
metadone: make(chan struct{}),
}
cl.producer.init()
cl.consumer.cl = cl
cl.consumer.sourcesReadyCond = sync.NewCond(&cl.consumer.sourcesReadyMu)
cl.consumer.init(cl)
cl.topics.Store(make(map[string]*topicPartitions))
cl.metawait.init()

Expand Down Expand Up @@ -403,19 +402,12 @@ func (cl *Client) updateBrokers(brokers []kmsg.MetadataResponseBroker) {

// Close leaves any group and closes all connections and goroutines.
func (cl *Client) Close() {
// First, kill the consumer. Setting dead to true and then assigning
// nothing will
// 1) invalidate active fetches
// 2) ensure consumptions are unassigned, stopping all source filling
// 3) ensures no more assigns can happen
cl.consumer.mu.Lock()
if cl.consumer.dead { // client already closed
cl.consumer.mu.Unlock()
return
// First, kill the consumer. This waits for the consumer to unset
// gracefully, ensuring we leave groups properly, and then stores the
// dead consumer, meaning no more assigns can happen.
if wasDead := cl.consumer.kill(); wasDead {
return // client was already closed
}
cl.consumer.dead = true
cl.consumer.mu.Unlock()
cl.AssignPartitions()

// Now we kill the client context and all brokers, ensuring all
// requests fail. This will finish all producer callbacks and
Expand Down
200 changes: 157 additions & 43 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,33 @@ func (o Offset) At(at int64) Offset {
return o
}

type consumerType uint8

const (
consumerTypeUnset consumerType = iota
consumerTypeDirect
consumerTypeGroup
)

type consumer struct {
cl *Client

// mu guards this block specifically
mu sync.Mutex
group *groupConsumer
direct *directConsumer
typ consumerType
// assignMu is grabbed when setting v (AssignGroup, AssignDirect, or Close)
// mu is grabbed when
// - polling fetches, for quickly draining sources / updating group uncommitted
// - calling assignPartitions (group / direct updates)
//
// v is atomic for non-locking reads in a few instances where that
// is preferrable / allowed.
assignMu sync.Mutex
mu sync.Mutex
v atomic.Value // *consumerValue

// On metadata update, if the consumer is set (direct or group), the
// client begins a goroutine that updates the consumer kind's
// assignments.
//
// This is done in a goroutine to not block the metadata loop, because
// the update **could** wait on a group consumer leaving if a
// concurrent AssignGroup is called (very low risk vector).
//
// The update realistically should be instantaneous, but if it is slow,
// some metadata updates could pile up. We loop with our atomic work
// loop, which collapses repeated updates into one extra update, so we
// loop as little as necessary.
outstandingMetadataUpdates workLoop

// sessionChangeMu is grabbed when a session is stopped and held through
// when a session can be started again. The sole purpose is to block an
Expand Down Expand Up @@ -122,16 +133,79 @@ func (u *usedCursors) use(c *cursor) {
(*u)[c] = struct{}{}
}

// unset, called under the consumer mu, transitions the group to the unset
var consumerUnsetSentinel = new(consumerValue)
var consumerDeadSentinel = new(consumerValue)

func init() {
consumerUnsetSentinel.v = &consumerUnsetSentinel
consumerDeadSentinel.v = &consumerDeadSentinel
}

type consumerValue struct {
// Options:
// - consumerUnsetSentinel
// - *directConsumer
// - *groupConsumer
// - consumerDeadSentinel
v interface{}
}

func (c *consumer) init(cl *Client) {
c.cl = cl
c.sourcesReadyCond = sync.NewCond(&c.sourcesReadyMu)
c.v.Store(consumerUnsetSentinel)
}

func (c *consumer) loadKind() interface{} { return c.v.Load().(*consumerValue).v }
func (c *consumer) loadGroup() (*groupConsumer, bool) {
g, ok := c.loadKind().(*groupConsumer)
return g, ok
}
func (c *consumer) loadDirect() (*directConsumer, bool) {
d, ok := c.loadKind().(*directConsumer)
return d, ok
}

func (c *consumer) storeDirect(d *directConsumer) { c.v.Store(&consumerValue{v: d}) } // while locked
func (c *consumer) storeGroup(g *groupConsumer) { c.v.Store(&consumerValue{v: g}) } // while locked

func (c *consumer) kill() (wasDead bool) {
c.assignMu.Lock()
wasDead, wait := c.unset()
c.v.Store(consumerDeadSentinel)
c.assignMu.Unlock()

wait()
return wasDead
}

func (c *consumer) unsetAndWait() (wasDead bool) {
wasDead, wait := c.unset()
wait()
return wasDead
}

// unset, called under the assign mu, transitions the group to the unset
// state, invalidating old assignments and leaving a group if it was in one.
func (c *consumer) unset() {
//
// This returns a function to wait for a group to be left, if in one.
func (c *consumer) unset() (wasDead bool, wait func()) {
c.mu.Lock()
defer c.mu.Unlock()

c.assignPartitions(nil, assignInvalidateAll)
if c.typ == consumerTypeGroup {
c.group.leave()

prior := c.loadKind()
wasDead = prior == consumerDeadSentinel
if !wasDead {
c.v.Store(consumerUnsetSentinel)
}

wait = func() {}
if g, ok := prior.(*groupConsumer); ok {
wait = g.leave()
}
c.typ = consumerTypeUnset
c.direct = nil
c.group = nil
return wasDead, wait
}

// addSourceReadyForDraining tracks that a source needs its buffered fetch
Expand Down Expand Up @@ -171,12 +245,34 @@ func (cl *Client) PollFetches(ctx context.Context) Fetches {

var fetches Fetches
fill := func() {
// A group can grab the consumer lock then the group mu and
// assign partitions. The group mu is grabbed to update its
// uncommitted map. Assigning partitions clears sources ready
// for draining.
//
// We need to grab the consumer mu to ensure proper lock
// ordering and prevent lock inversion. Polling fetches also
// updates the group's uncommitted map; if we do not grab the
// consumer mu at the top, we have a problem: without the lock,
// we could have grabbed some sources, then a group assigned,
// and after the assign, we update uncommitted with fetches
// from the old assignment
c.mu.Lock()
defer c.mu.Unlock()

c.sourcesReadyMu.Lock()
defer c.sourcesReadyMu.Unlock()
for _, ready := range c.sourcesReadyForDraining {
fetches = append(fetches, ready.takeBuffered())
}
c.sourcesReadyForDraining = nil
realFetches := fetches
fetches = append(fetches, c.fakeReadyForDraining...)
c.fakeReadyForDraining = nil
c.sourcesReadyMu.Unlock()

if len(realFetches) == 0 {
return
}

// Before returning, we want to update our uncommitted. If we
// updated after, then we could end up with weird interactions
Expand All @@ -187,17 +283,9 @@ func (cl *Client) PollFetches(ctx context.Context) Fetches {
// session to start. If we returned stale fetches that did not
// have their uncommitted offset tracked, then we would allow
// duplicates.
//
// We grab the consumer mu because a concurrent client close
// could happen.
c.mu.Lock()
if c.typ == consumerTypeGroup && len(fetches) > 0 {
c.group.updateUncommitted(fetches)
if g, ok := c.loadGroup(); ok {
g.updateUncommitted(realFetches)
}
c.mu.Unlock()

fetches = append(fetches, c.fakeReadyForDraining...)
c.fakeReadyForDraining = nil
}

fill()
Expand Down Expand Up @@ -354,7 +442,12 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
clientTopics := c.cl.loadTopics()
for topic, partitions := range assignments {

topicParts := clientTopics[topic].load() // must be non-nil, which is ensured in Assign<> or in metadata when consuming as regex
topicPartitions := clientTopics[topic] // should be non-nil
if topicPartitions == nil {
c.cl.cfg.logger.Log(LogLevelError, "BUG! consumer was assigned topic that we did not ask for in AssignGroup nor AssignDirect, skipping!", "topic", topic)
continue
}
topicParts := topicPartitions.load()

for partition, offset := range partitions {
// First, if the request is exact, get rid of the relative
Expand Down Expand Up @@ -410,23 +503,44 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
}

func (c *consumer) doOnMetadataUpdate() {
c.mu.Lock()
defer c.mu.Unlock()

switch c.typ {
case consumerTypeUnset:
switch c.loadKind().(type) {
case *directConsumer:
case *groupConsumer:
default:
return
case consumerTypeDirect:
c.assignPartitions(c.direct.findNewAssignments(c.cl.loadTopics()), assignWithoutInvalidating)
case consumerTypeGroup:
c.group.findNewAssignments(c.cl.loadTopics())
}

go c.loadSession().doOnMetadataUpdate()
// See the comment on the outstandingMetadataUpdates field for why this
// block below.
if c.outstandingMetadataUpdates.maybeBegin() {
doUpdate := func() {
switch t := c.loadKind().(type) {
case *directConsumer:
if new := t.findNewAssignments(c.cl.loadTopics()); len(new) > 0 {
c.mu.Lock()
c.assignPartitions(new, assignWithoutInvalidating)
c.mu.Unlock()
}
case *groupConsumer:
t.findNewAssignments(c.cl.loadTopics())
}

go c.loadSession().doOnMetadataUpdate()
}

go func() {
again := true
for again {
doUpdate()
again = c.outstandingMetadataUpdates.maybeFinish(false)
}
}()
}

}

func (s *consumerSession) doOnMetadataUpdate() {
if s == nil { // no session started yet
if s == nil || s == noConsumerSession { // no session started yet
return
}

Expand Down
13 changes: 8 additions & 5 deletions pkg/kgo/consumer_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,13 @@ type directConsumer struct {
// This takes ownership of any assignments.
func (cl *Client) AssignPartitions(opts ...DirectConsumeOpt) {
c := &cl.consumer
c.mu.Lock()
defer c.mu.Unlock()

c.unset()
c.assignMu.Lock()
defer c.assignMu.Unlock()

if wasDead := c.unsetAndWait(); wasDead {
return
}

d := &directConsumer{
topics: make(map[string]Offset),
Expand All @@ -80,8 +83,8 @@ func (cl *Client) AssignPartitions(opts ...DirectConsumeOpt) {
if len(d.topics) == 0 && len(d.partitions) == 0 || c.dead {
return
}
c.typ = consumerTypeDirect
c.direct = d

c.storeDirect(d)

defer cl.triggerUpdateMetadata()

Expand Down
Loading

0 comments on commit 938651e

Please sign in to comment.