Skip to content

Commit

Permalink
kgo.Client: add GroupMetadata, ProducerID, PartitionLeader functions
Browse files Browse the repository at this point in the history
These helper functions can return the client's internal state. Only
ProducerID waits, the others just return the current state. These
functions can be used to build manual commit messages.

Closes #175.
  • Loading branch information
twmb committed Aug 22, 2022
1 parent 9497cf3 commit ac2f97b
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 2 deletions.
15 changes: 15 additions & 0 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,21 @@ func (cl *Client) LeaveGroup() {
wait() // wait after we unlock
}

// GroupMetadata returns the current group member ID and generation, or an
// empty string and -1 if not in the group.
func (cl *Client) GroupMetadata() (string, int32) {
g := cl.consumer.g
if g == nil {
return "", -1
}
g.mu.Lock()
defer g.mu.Unlock()
if g.memberID == "" {
return "", -1
}
return g.memberID, g.generation
}

func (c *consumer) initGroup() {
ctx, cancel := context.WithCancel(c.cl.ctx)
g := &groupConsumer{
Expand Down
40 changes: 38 additions & 2 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,42 @@ func (cl *Client) ForceMetadataRefresh() {
cl.triggerUpdateMetadataNow("from user ForceMetadataRefresh")
}

// PartitionLeader returns the given topic partition's leader, leader epoch and
// load error. This returns -1, -1, nil if the partition has not been loaded.
func (cl *Client) PartitionLeader(topic string, partition int32) (leader, leaderEpoch int32, err error) {
if partition < 0 {
return -1, -1, errors.New("invalid negative partition")
}

var t *topicPartitions

m := cl.producer.topics.load()
if len(m) > 0 {
t = m[topic]
}
if t == nil {
if cl.consumer.g != nil {
if m = cl.consumer.g.tps.load(); len(m) > 0 {
t = m[topic]
}
} else if cl.consumer.d != nil {
if m = cl.consumer.d.tps.load(); len(m) > 0 {
t = m[topic]
}
}
if t == nil {
return -1, -1, nil
}
}

tv := t.load()
if len(tv.partitions) <= int(partition) {
return -1, -1, tv.loadErr
}
p := tv.partitions[partition]
return p.leader, p.leaderEpoch, p.loadErr
}

// waitmeta returns immediately if metadata was updated within the last second,
// otherwise this waits for up to wait for a metadata update to complete.
func (cl *Client) waitmeta(ctx context.Context, wait time.Duration, why string) {
Expand Down Expand Up @@ -260,11 +296,11 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
)
c := &cl.consumer
switch {
case c.d != nil:
tpsConsumer = c.d.tps
case c.g != nil:
tpsConsumer = c.g.tps
groupExternal = c.g.loadExternal()
case c.d != nil:
tpsConsumer = c.d.tps
}

if !all {
Expand Down
25 changes: 25 additions & 0 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,31 @@ func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPart
}
}

// ProducerID returns, loading if necessary, the current producer ID and epoch.
// This returns an error if the producer ID could not be loaded, if the
// producer ID has fatally errored, or if the context is canceled.
func (cl *Client) ProducerID(ctx context.Context) (int64, int16, error) {
var (
id int64
epoch int16
err error

done = make(chan struct{})
)

go func() {
defer close(done)
id, epoch, err = cl.producerID()
}()

select {
case <-ctx.Done():
return 0, 0, ctx.Err()
case <-done:
return id, epoch, err
}
}

type producerID struct {
id int64
epoch int16
Expand Down

0 comments on commit ac2f97b

Please sign in to comment.