Skip to content

Commit

Permalink
kgo source: use the proper topic-to-id map when forgetting topics
Browse files Browse the repository at this point in the history
Adding topics to a session needs to use the fetch request's topic2id
map (which then promotes IDs into the session t2id map).

Importantly, and previously this was wrong / not the case: removing
topics from a session needs to use the session's t2id map. The topic
does not exist in the request's topic2id map, because well, it's being
forgotten. It's not in the fetch request.

Adds some massive comments explaining the situation.

Closes #620.
  • Loading branch information
twmb committed Dec 3, 2023
1 parent a6d10d4 commit 1b6a721
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,26 @@ type fetchRequest struct {
torder []string // order of topics to write
porder map[string][]int32 // per topic, order of partitions to write

// topic2id and id2topic track bidirectional lookup of topics and IDs
// that are being added to *this* specific request. topic2id slightly
// duplicates the map t2id in the fetch session, but t2id is different
// in that t2id tracks IDs in use from all prior requests -- and,
// importantly, t2id is cleared of IDs that are no longer used (see
// ForgottenTopics).
//
// We need to have both a session t2id map and a request t2id map:
//
// * The session t2id is what we use when creating forgotten topics.
// If we are forgetting a topic, the ID is not in the req t2id.
//
// * The req topic2id is used for adding to the session t2id. When
// building a request, if the id is in req.topic2id but not
// session.t2id, we promote the ID into the session map.
//
// Lastly, id2topic is used when handling the response, as our reverse
// lookup from the ID back to the topic (and then we work with the
// topic name only). There is no equivalent in the session because
// there is no need for the id2topic lookup ever in the session.
topic2id map[string][16]byte
id2topic map[[16]byte]string

Expand Down Expand Up @@ -2067,7 +2087,7 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte {
if forgottenTopic == nil {
t := kmsg.NewFetchRequestForgottenTopic()
t.Topic = topic
t.TopicID = f.topic2id[topic]
t.TopicID = f.session.t2id[topic]
req.ForgottenTopics = append(req.ForgottenTopics, t)
forgottenTopic = &req.ForgottenTopics[len(req.ForgottenTopics)-1]
}
Expand All @@ -2079,7 +2099,8 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte {
id := f.session.t2id[topic]
delete(f.session.t2id, topic)
// If we deleted a topic that was missing an ID, then we clear the
// previous disableIDs state and potentially reenable it.
// previous disableIDs state. We potentially *reenable* disableIDs
// if any remaining topics in our session are also missing their ID.
var noID [16]byte
if id == noID {
f.session.disableIDs = false
Expand Down

0 comments on commit 1b6a721

Please sign in to comment.