Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consumer): maintain ordering of offset commit requests #2947

Merged
merged 4 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,28 +251,52 @@ func (om *offsetManager) Commit() {
}

func (om *offsetManager) flushToBroker() {
broker, err := om.coordinator()
if err != nil {
om.handleError(err)
return
}

// Care needs to be taken to unlock this. Don't want to defer the unlock as this would
// cause the lock to be held while waiting for the broker to reply.
broker.lock.Lock()
req := om.constructRequest()
if req == nil {
broker.lock.Unlock()
return
}
resp, rp, err := sendOffsetCommit(broker, req)
broker.lock.Unlock()

broker, err := om.coordinator()
if err != nil {
om.handleError(err)
om.releaseCoordinator(broker)
_ = broker.Close()
return
}

resp, err := broker.CommitOffset(req)
err = handleResponsePromise(req, resp, rp, nil)
if err != nil {
om.handleError(err)
om.releaseCoordinator(broker)
_ = broker.Close()
return
}

broker.handleThrottledResponse(resp)
om.handleResponse(broker, req, resp)
}

func sendOffsetCommit(coordinator *Broker, req *OffsetCommitRequest) (*OffsetCommitResponse, *responsePromise, error) {
resp := new(OffsetCommitResponse)
responseHeaderVersion := resp.headerVersion()
promise, err := coordinator.send(req, true, responseHeaderVersion)
if err != nil {
return nil, nil, err
}
return resp, promise, err
}

func (om *offsetManager) constructRequest() *OffsetCommitRequest {
r := &OffsetCommitRequest{
Version: 1,
Expand Down
110 changes: 110 additions & 0 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package sarama
import (
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -82,6 +84,9 @@ func TestNewOffsetManager(t *testing.T) {
metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
seedBroker.Returns(metadataResponse)
findCoordResponse := new(FindCoordinatorResponse)
findCoordResponse.Coordinator = &Broker{id: seedBroker.brokerID, addr: seedBroker.Addr()}
seedBroker.Returns(findCoordResponse)
defer seedBroker.Close()

testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
Expand All @@ -102,6 +107,111 @@ func TestNewOffsetManager(t *testing.T) {
}
}

// Test that the correct sequence of offset commit messages is sent to a broker when
// multiple goroutines for a group are committing offsets at the same time
func TestOffsetManagerCommitSequence(t *testing.T) {
mu := &sync.Mutex{}
lastOffset := map[int32]int64{}
outOfOrder := ""
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
seedBroker.SetHandlerFuncByMap(map[string]requestHandlerFunc{
"MetadataRequest": func(req *request) (rest encoderWithHeader) {
resp := new(MetadataResponse)
resp.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
return resp
},
"FindCoordinatorRequest": func(req *request) (rest encoderWithHeader) {
resp := new(FindCoordinatorResponse)
resp.Coordinator = &Broker{id: seedBroker.brokerID, addr: seedBroker.Addr()}
return resp
},
"OffsetFetchRequest": func(r *request) (rest encoderWithHeader) {
req := r.body.(*OffsetFetchRequest)
resp := new(OffsetFetchResponse)
resp.Blocks = map[string]map[int32]*OffsetFetchResponseBlock{}
for topic, partitions := range req.partitions {
for _, partition := range partitions {
if _, ok := resp.Blocks[topic]; !ok {
resp.Blocks[topic] = map[int32]*OffsetFetchResponseBlock{}
}
resp.Blocks[topic][partition] = &OffsetFetchResponseBlock{
Offset: 0,
Err: ErrNoError,
}
}
}
return resp
},
"OffsetCommitRequest": func(r *request) (rest encoderWithHeader) {
req := r.body.(*OffsetCommitRequest)
func() {
mu.Lock()
defer mu.Unlock()
if outOfOrder == "" {
for partition, offset := range req.blocks["topic"] {
last := lastOffset[partition]
if last > offset.offset {
outOfOrder =
fmt.Sprintf("out of order commit to partition %d, current committed offset: %d, offset in request: %d",
partition, last, offset.offset)
}
lastOffset[partition] = offset.offset
}
}
}()

// Potentially yield, to try and avoid each Go routine running sequentially to completion
runtime.Gosched()

resp := new(OffsetCommitResponse)
resp.Errors = map[string]map[int32]KError{}
resp.Errors["topic"] = map[int32]KError{}
for partition := range req.blocks["topic"] {
resp.Errors["topic"][partition] = ErrNoError
}
return resp
},
})
testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
if err != nil {
t.Fatal(err)
}
defer safeClose(t, testClient)
om, err := NewOffsetManagerFromClient("group", testClient)
if err != nil {
t.Error(err)
}
defer safeClose(t, om)

const numPartitions = 10
const commitsPerPartition = 1000

wg := &sync.WaitGroup{}
for p := 0; p < numPartitions; p++ {
pom, err := om.ManagePartition("topic", int32(p))
if err != nil {
t.Error(err)
}

wg.Add(1)
go func() {
for c := 0; c < commitsPerPartition; c++ {
pom.MarkOffset(int64(c+1), "")
om.Commit()
}
wg.Done()
}()
}

wg.Wait()
mu.Lock()
defer mu.Unlock()
if outOfOrder != "" {
t.Error(outOfOrder)
}
}

var offsetsautocommitTestTable = []struct {
name string
set bool // if given will override default configuration for Consumer.Offsets.AutoCommit.Enable
Expand Down
Loading