Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual commit multiple partitions consumer #137

Merged
merged 6 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions examples/goroutine_per_partition_consuming_manual_commit/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Group consuming, using a goroutine per partition and manual committing
===

This example consumes from a group and starts a goroutine to process each
partition concurrently, and committing manually after doing some work, on the slice of records.
This type of code may be useful if processing each
record per partition is slow, such that processing records in a single
`PollRecords` loop is not as fast as you want it to be.

This is just one example of how to process messages concurrently. A simpler
solution would be just to have a group of record consumers selecting from a
channel, and to send all records down this channel in your `PollRecords` loop.
However, that simple solution does not preserve per-partition ordering.

## Flags

`-b` can be specified to override the default localhost:9092 broker to any
comma delimited set of brokers.

`-t` specifies the topic to consume (required)

`-g` specifies the group to consume in (required)

15 changes: 15 additions & 0 deletions examples/goroutine_per_partition_consuming_manual_commit/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module goroutine_per_partition_consuming_manual_commit

go 1.17

replace github.com/twmb/franz-go => ../../

require github.com/twmb/franz-go v1.3.4

require (
github.com/klauspost/compress v1.14.4 // indirect
github.com/pierrec/lz4/v4 v4.1.14 // indirect
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20220222044056-99b4da42cf4b // indirect
github.com/twmb/go-rbtree v1.0.0 // indirect
golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 // indirect
)
44 changes: 44 additions & 0 deletions examples/goroutine_per_partition_consuming_manual_commit/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM=
github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4=
github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE=
github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20220222044056-99b4da42cf4b h1:K7ICUMoexUPk3aFQOgydoeRS2nCZvTcKMm93lkGDGPc=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20220222044056-99b4da42cf4b/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg=
github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 h1:/pEO3GD/ABYAjuakUS6xSEmmlyVS4kxBNkeA9tLJiTI=
golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
161 changes: 161 additions & 0 deletions examples/goroutine_per_partition_consuming_manual_commit/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package main

import (
"context"
"flag"
"fmt"
"math/rand"
"strings"
"sync"
"time"

"github.com/twmb/franz-go/pkg/kgo"
)

type pconsumer struct {
quit chan struct{}
done chan struct{}
recs chan []*kgo.Record
}

var (
brokers = flag.String("b", "", "comma delimited brokers to consume from")
topic = flag.String("t", "", "topic to consume")
group = flag.String("g", "", "group to consume in")
)

func (pc *pconsumer) consume(topic string, partition int32, cl *kgo.Client) {
fmt.Printf("Starting consume for t %s p %d\n", topic, partition)
for {
select {
case <-pc.quit:
pc.done <- struct{}{}
fmt.Printf("Closing consume for t %s p %d\n", topic, partition)
return
case recs := <-pc.recs:
// Mimick work to happen before committing records
time.Sleep(time.Duration(rand.Intn(150)+100) * time.Millisecond)
fmt.Printf("Some sort of work done, about to commit t %s p %d\n", topic, partition)
err := cl.CommitRecords(context.Background(), recs...)
Copy link
Owner

@twmb twmb Mar 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you would prefer MarkCommitRecords here, and AutoCommitMarks? Perhaps I need another, CommitMarks CommitUncommittedOffsets does exactly this.

This would allow more batching for your commits, vs. each commit here being blocking.

Not sure, what do you think? This is fine too.

Copy link
Contributor

@brunsgaard brunsgaard Mar 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our "real" usecase we dont commit that often, we rollup 10000 messages over a period of minutes so the commit per partition is called infrequently. For this example i like we stick to the commit per []*kgo.records if your prefer to use the markcommit strategy for this example we will do that. We are aware committing too often is a undesirable pattern.

if err != nil {
fmt.Printf("Error when committing offsets to kafka err: %v t: %s p: %d offset %d\n", err, topic, partition, recs[len(recs)-1].Offset+1)
}
}
}
}

type splitConsume struct {
mu sync.Mutex // gaurds assigning / losing vs. polling
consumers map[string]map[int32]pconsumer
}

func (s *splitConsume) assigned(_ context.Context, cl *kgo.Client, assigned map[string][]int32) {
s.mu.Lock()
defer s.mu.Unlock()
for topic, partitions := range assigned {
if s.consumers[topic] == nil {
s.consumers[topic] = make(map[int32]pconsumer)
}
for _, partition := range partitions {
pc := pconsumer{
quit: make(chan struct{}),
done: make(chan struct{}),
recs: make(chan []*kgo.Record),
}
s.consumers[topic][partition] = pc
go pc.consume(topic, partition, cl)
}
}
}

func (s *splitConsume) lost(_ context.Context, cl *kgo.Client, lost map[string][]int32) {
s.mu.Lock()
defer s.mu.Unlock()
for topic, partitions := range lost {
ptopics := s.consumers[topic]
for _, partition := range partitions {
pc := ptopics[partition]
delete(ptopics, partition)
if len(ptopics) == 0 {
delete(s.consumers, topic)
}
close(pc.quit)
fmt.Printf("Waiting for work to finish t %s p %d\n", topic, partition)
<-pc.done
}
}
}

func main() {
rand.Seed(time.Now().Unix())
flag.Parse()

if len(*group) == 0 {
fmt.Println("missing required group")
return
}
if len(*topic) == 0 {
fmt.Println("missing required topic")
return
}

s := &splitConsume{
consumers: make(map[string]map[int32]pconsumer),
}

opts := []kgo.Opt{
kgo.SeedBrokers(strings.Split(*brokers, ",")...),
kgo.ConsumerGroup(*group),
kgo.ConsumeTopics(*topic),
kgo.OnPartitionsAssigned(s.assigned),
kgo.OnPartitionsRevoked(s.lost),
kgo.OnPartitionsLost(s.lost),
kgo.DisableAutoCommit(),
kgo.BlockRebalanceOnPoll(),
}

cl, err := kgo.NewClient(opts...)
if err != nil {
panic(err)
}
// Check connectivity to cluster
err = cl.Ping(context.Background())
if err != nil {
panic(err)
}

s.poll(cl)
}

func (s *splitConsume) poll(cl *kgo.Client) {
for {
fetches := cl.PollRecords(context.Background(), 10000)
if fetches.IsClientClosed() {
return
}
fetches.EachError(func(_ string, _ int32, err error) {
panic(err)
})
fetches.EachTopic(func(t kgo.FetchTopic) {
s.mu.Lock()
tconsumers := s.consumers[t.Topic]
s.mu.Unlock()
if tconsumers == nil {
return
}
t.EachPartition(func(p kgo.FetchPartition) {
pc, ok := tconsumers[p.Partition]
if !ok {
return
}
select {
case pc.recs <- p.Records:
case <-pc.quit:
}
})
})
s.mu.Lock()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading this correctly, this lock is to ensure onLost is not running?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the main comments, yes. Perhaps a comment here on that would be good.

Copy link
Contributor

@brunsgaard brunsgaard Mar 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onLost, is having <-pc.done that ensures the records are committed before the goroutine returns, thus before onLost returns. Thus committing work in progress before rebalance is allowed. Does it makes sense?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm planning to merge this then add followup commit that adds a few more comments and potentially avoids EachTopic (which internally allocates a map). I'm pretty sure on the new API naming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are unsure if that makes sense our self. Please advice :)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll merge this now and touch up a few things (file reorganization now that there are two similar examples, potentially avoid FetchTopics, add an example using AutoCommitMarks). Will ping here once done.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see why this lock is necessary, and I think I may switch BlockRebalanceOnPoll to also block polling while user provided OnAssiged / OnRevoked / OnLost are called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check https://github.com/twmb/franz-go/blob/v1.4/examples/goroutine_per_partition_consuming/manual_commit/main.go to see what this example has become :)

I think it's a good bit simpler now to avoid locking.

cl.AllowRebalance()
s.mu.Unlock()
}
}