Skip to content

Commit

Permalink
kgo: avoid pointer reuse in metadata across producers & consumers
Browse files Browse the repository at this point in the history
Previously, on metadata update, we would construct *topicPartitionData
(translate the metadata to our internal types) and then use that in some
fancy way to update our own state.

A change a while back separate the producer and consumer maps, fixing
some bugs and simplifying things. At the time, I considered it fine to
reuse the topicPartitionData -- I didn't realize the ramification of
having things stored separately.

As it turns out, a specific sequence of events can result in a bug,
and this bug requires producing to and consuming from the same topic in
the same client:

* consume "foo", loading metadata successfully once
* produce to "foo", causing another metadata load because the maps are
separate
* this second metadata load fails with partition errors and causes
another metadata update
* third metadata update sees the leader has changed and transfers
leadership

This will cause a panic.

The second metadata load would add the new recBuf to the proper sink
(broker). Then the same *topicPartitionData from the metadata update is
used to for updating the consumer state (cursor & source). The
*topicPartitionData has a load error, so we instead copy the old
*topicPartitionData to the new one and save it. This copied the old
records / recBufsIdx of -1. The producer was now using a recBuf that has
a recBufsIdx of -1.

There are two eventual scenarios here:

1) the next metadata update will re-add the recBuf to the sink because
the index is -1. This is forever wasteful, but not problematic: the the
recBuf is on the same sink twice, and removing it will only remove the
first copy. The old sink will always attempt to drain and produce this
recBuf, and producing this will just result in a bunch of wasted
partition errors. The recBuf will also be drained by the proper sink,
so, nothing problematic here.

2) the next metadata update (step three above) will move the partition
to another broker. This will panic, because the recBufsIdx is -1, an
invalid index.

We fix this by instead mapping the kmsg structs into our own structs,
and then map that _again_ into producer and consumer specific structs.
Notably, the other half (cursor or records) is always nil. If we ever
have reuse problems in the future, we will get much more obvious panics.

Now, we absolutely ensure we do not share pointers for both the consumer
side and the producer side.

Fixes #190
  • Loading branch information
twmb committed Sep 1, 2022
1 parent 3191842 commit 0ca6478
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 70 deletions.
6 changes: 3 additions & 3 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,12 +1222,12 @@ func (g *groupExternal) eachTopic(fn func(string)) {
})
}

func (g *groupExternal) updateLatest(meta map[string]*topicPartitionsData) {
func (g *groupExternal) updateLatest(meta map[string]*metadataTopic) {
g.cloned(func(tps map[string]int32) {
var rejoin bool
for t, ps := range tps {
latest := meta[t]
if latest == nil || latest.loadErr != nil {
latest, exists := meta[t]
if !exists || latest.loadErr != nil {
continue
}
if psLatest := int32(len(latest.partitions)); psLatest != ps {
Expand Down
177 changes: 110 additions & 67 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,15 +409,95 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
return retryWhy, nil
}

// We use a special structure to repesent metadata before we *actually* convert
// it to topicPartitionsData. This helps avoid any pointer reuse problems
// because we want to keep the client's producer and consumer maps completely
// independent. If we just returned map[string]*topicPartitionsData, we could
// end up in some really weird pointer reuse scenario that ultimately results
// in a bug.
//
// See #190 for more details, as well as the commit message introducing this.
type metadataTopic struct {
loadErr error
isInternal bool
partitions []metadataPartition
}

func (mt *metadataTopic) newPartitions(cl *Client, isProduce bool) *topicPartitionsData {
n := len(mt.partitions)
ps := &topicPartitionsData{
loadErr: mt.loadErr,
isInternal: mt.isInternal,
partitions: make([]*topicPartition, 0, n),
writablePartitions: make([]*topicPartition, 0, n),
}
for i := range mt.partitions {
p := mt.partitions[i].newPartition(cl, isProduce)
ps.partitions = append(ps.partitions, p)
if p.loadErr == nil {
ps.writablePartitions = append(ps.writablePartitions, p)
}
}
return ps
}

type metadataPartition struct {
topic string
topicID [16]byte
partition int32
loadErr int16
leader int32
leaderEpoch int32
sns sinkAndSource
}

func (mp metadataPartition) newPartition(cl *Client, isProduce bool) *topicPartition {
td := topicPartitionData{
leader: mp.leader,
leaderEpoch: mp.leaderEpoch,
}
p := &topicPartition{
loadErr: kerr.ErrorForCode(mp.loadErr),
topicPartitionData: td,
}
if isProduce {
p.records = &recBuf{
cl: cl,
topic: mp.topic,
partition: mp.partition,
maxRecordBatchBytes: cl.maxRecordBatchBytesForTopic(mp.topic),
recBufsIdx: -1,
failing: mp.loadErr != 0,
sink: mp.sns.sink,
topicPartitionData: td,
}
} else {
p.cursor = &cursor{
topic: mp.topic,
topicID: mp.topicID,
partition: mp.partition,
keepControl: cl.cfg.keepControl,
cursorsIdx: -1,
source: mp.sns.source,
topicPartitionData: td,
cursorOffset: cursorOffset{
offset: -1, // required to not consume until needed
lastConsumedEpoch: -1, // required sentinel
},
}
}
return p
}

// fetchTopicMetadata fetches metadata for all reqTopics and returns new
// topicPartitionsData for each topic.
func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]*topicPartitionsData, error) {
func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]*metadataTopic, error) {
_, meta, err := cl.fetchMetadataForTopics(cl.ctx, all, reqTopics)
if err != nil {
return nil, err
}

topics := make(map[string]*topicPartitionsData, len(meta.Topics))
topics := make(map[string]*metadataTopic, len(meta.Topics))

// Even if metadata returns a leader epoch, we do not use it unless we
// can validate it per OffsetForLeaderEpoch. Some brokers may have an
Expand All @@ -432,21 +512,21 @@ func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]*
}
topic := *topicMeta.Topic

parts := &topicPartitionsData{
loadErr: kerr.ErrorForCode(topicMeta.ErrorCode),
isInternal: topicMeta.IsInternal,
partitions: make([]*topicPartition, 0, len(topicMeta.Partitions)),
writablePartitions: make([]*topicPartition, 0, len(topicMeta.Partitions)),
mt := &metadataTopic{
loadErr: kerr.ErrorForCode(topicMeta.ErrorCode),
isInternal: topicMeta.IsInternal,
partitions: make([]metadataPartition, 0, len(topicMeta.Partitions)),
}
topics[topic] = parts

if parts.loadErr != nil {
topics[topic] = mt

if mt.loadErr != nil {
continue
}

// This 249 limit is in Kafka itself, we copy it here to rely on it while producing.
if len(topic) > 249 {
parts.loadErr = fmt.Errorf("invalid long topic name of (len %d) greater than max allowed 249", len(topic))
mt.loadErr = fmt.Errorf("invalid long topic name of (len %d) greater than max allowed 249", len(topic))
continue
}

Expand All @@ -458,12 +538,12 @@ func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]*
})
for i := range topicMeta.Partitions {
if got := topicMeta.Partitions[i].Partition; got != int32(i) {
parts.loadErr = fmt.Errorf("kafka did not reply with a comprensive set of partitions for a topic; we expected partition %d but saw %d", i, got)
mt.loadErr = fmt.Errorf("kafka did not reply with a comprensive set of partitions for a topic; we expected partition %d but saw %d", i, got)
break
}
}

if parts.loadErr != nil {
if mt.loadErr != nil {
continue
}

Expand All @@ -473,59 +553,25 @@ func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]*
if meta.Version < 7 || !useLeaderEpoch {
leaderEpoch = -1
}

p := &topicPartition{
loadErr: kerr.ErrorForCode(partMeta.ErrorCode),
topicPartitionData: topicPartitionData{
leader: partMeta.Leader,
leaderEpoch: leaderEpoch,
},

records: &recBuf{
cl: cl,

topic: topic,
partition: partMeta.Partition,

maxRecordBatchBytes: cl.maxRecordBatchBytesForTopic(topic),

recBufsIdx: -1,
failing: partMeta.ErrorCode != 0,
},

cursor: &cursor{
topic: topic,
topicID: topicMeta.TopicID,
partition: partMeta.Partition,
keepControl: cl.cfg.keepControl,
cursorsIdx: -1,

cursorOffset: cursorOffset{
offset: -1, // required to not consume until needed
lastConsumedEpoch: -1, // required sentinel
},
},
mp := metadataPartition{
topic: topic,
topicID: topicMeta.TopicID,
partition: partMeta.Partition,
loadErr: partMeta.ErrorCode,
leader: partMeta.Leader,
leaderEpoch: leaderEpoch,
}

// Any partition that has a load error uses the first
// seed broker as a leader. This ensures that every
// record buffer and every cursor can use a sink or
// source.
if p.loadErr != nil {
p.leader = unknownSeedID(0)
if mp.loadErr != 0 {
mp.leader = unknownSeedID(0) // ensure every records & cursor can use a sink or source
}

p.cursor.topicPartitionData = p.topicPartitionData
p.records.topicPartitionData = p.topicPartitionData

cl.sinksAndSourcesMu.Lock()
sns, exists := cl.sinksAndSources[p.leader]
sns, exists := cl.sinksAndSources[mp.leader]
if !exists {
sns = sinkAndSource{
sink: cl.newSink(p.leader),
source: cl.newSource(p.leader),
sink: cl.newSink(mp.leader),
source: cl.newSource(mp.leader),
}
cl.sinksAndSources[p.leader] = sns
cl.sinksAndSources[mp.leader] = sns
}
for _, replica := range partMeta.Replicas {
if replica < 0 {
Expand All @@ -539,13 +585,8 @@ func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]*
}
}
cl.sinksAndSourcesMu.Unlock()
p.records.sink = sns.sink
p.cursor.source = sns.source

parts.partitions = append(parts.partitions, p)
if p.loadErr == nil {
parts.writablePartitions = append(parts.writablePartitions, p)
}
mp.sns = sns
mt.partitions = append(mt.partitions, mp)
}
}

Expand All @@ -559,14 +600,16 @@ func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]*
func (cl *Client) mergeTopicPartitions(
topic string,
l *topicPartitions,
r *topicPartitionsData,
mt *metadataTopic,
isProduce bool,
reloadOffsets *listOrEpochLoads,
stopConsumerSession func(),
retryWhy *multiUpdateWhy,
) {
lv := *l.load() // copy so our field writes do not collide with reads

r := mt.newPartitions(cl, isProduce)

// Producers must store the update through a special function that
// manages unknown topic waiting, whereas consumers can just simply
// store the update.
Expand Down

0 comments on commit 0ca6478

Please sign in to comment.